/*-------------------------------------------------------------------------
 *
 * copy.c
 *        Implements the COPY utility command
 *
 * Portions Copyright (c) 2012-2014, TransLattice, Inc.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * This source code file contains modifications made by THL A29 Limited ("Tencent Modifications").
 * All Tencent Modifications are Copyright (C) 2023 THL A29 Limited.
 *
 * IDENTIFICATION
 *      src/backend/commands/copy.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#ifdef XCP
#include "pgstat.h"
#endif

#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/pg_type.h"
#ifdef XCP
#include "catalog/dependency.h"
#include "commands/sequence.h"
#endif
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/remotecopy.h"
#include "nodes/nodes.h"
#include "pgxc/poolmgr.h"
#include "catalog/pgxc_node.h"
#endif
#include "nodes/makefuncs.h"
#include "optimizer/pgxcship.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#ifdef __OPENTENBASE__
#include "utils/rel.h"
#include "utils/ruleutils.h"
#endif
#ifdef __STORAGE_SCALABLE__
#include "replication/logical_statistic.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#endif

#ifdef _MLS_
#include "utils/mls.h"
#include "utils/datamask.h"
#include "commands/relcryptcommand.h"
#endif
#ifdef __COLD_HOT__
#include "pgxc/shardmap.h"
#endif

#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')

/*
 * Represents the different source/dest cases we need to worry about at
 * the bottom level
 */
typedef enum CopyDest
{
    COPY_FILE,                    /* to/from file (or a piped program) */
    COPY_OLD_FE,                /* to/from frontend (2.0 protocol) */
    COPY_NEW_FE,                    /* to/from frontend (3.0 protocol) */
#ifdef PGXC
    COPY_BUFFER,                /* Do not send, just prepare */
#endif
    COPY_CALLBACK                /* to/from callback function */
} CopyDest;

/*
 *    Represents the end-of-line terminator type of the input
 */
typedef enum EolType
{
    EOL_UNKNOWN,
    EOL_NL,
    EOL_CR,
    EOL_CRNL
} EolType;

/*
 * This struct contains all the state variables used throughout a COPY
 * operation. For simplicity, we use the same struct for all variants of COPY,
 * even though some fields are used in only some cases.
 *
 * Multi-byte encodings: all supported client-side encodings encode multi-byte
 * characters by having the first byte's high bit set. Subsequent bytes of the
 * character can have the high bit not set. When scanning data in such an
 * encoding to look for a match to a single-byte (ie ASCII) character, we must
 * use the full pg_encoding_mblen() machinery to skip over multibyte
 * characters, else we might find a false match to a trailing byte. In
 * supported server encodings, there is no possibility of a false match, and
 * it's faster to make useless comparisons to trailing bytes than it is to
 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
 * when we have to do it the hard way.
 */
typedef struct CopyStateData
{
    /* low-level state data */
    CopyDest    copy_dest;        /* type of copy source/destination */
    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO, only for
                                 * dest == COPY_NEW_FE in COPY FROM */
    bool        fe_eof;            /* true if detected end of copy data */
    EolType        eol_type;        /* EOL type of input */
    int            file_encoding;    /* file or remote side's character encoding */
    bool        need_transcoding;    /* file encoding diff from server? */
    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */

    /* parameters from the COPY command */
    Relation    rel;            /* relation to copy to or from */
    QueryDesc  *queryDesc;        /* executable query to copy from */
    List       *attnumlist;        /* integer list of attnums to copy */
    char       *filename;        /* filename, or NULL for STDIN/STDOUT */
    bool        is_program;        /* is 'filename' a program to popen? */
    copy_data_source_cb data_source_cb; /* function for reading data */
    bool        binary;            /* binary format? */
    bool        oids;            /* include OIDs? */
    bool        freeze;            /* freeze rows on loading? */
    bool        csv_mode;        /* Comma Separated Value format? */
#ifdef __OPENTENBASE__
    bool        internal_mode;  /* internal format? */
#endif
    bool        header_line;    /* CSV header line? */
    char       *null_print;        /* NULL marker string (server encoding!) */
    int            null_print_len; /* length of same */
    char       *null_print_client;    /* same converted to file encoding */
    char       *delim;            /* column delimiter (must be 1 byte) */
    char       *quote;            /* CSV quote char (must be 1 byte) */
    char       *escape;            /* CSV escape char (must be 1 byte) */
    List       *force_quote;    /* list of column names */
    bool        force_quote_all;    /* FORCE_QUOTE *? */
    bool       *force_quote_flags;    /* per-column CSV FQ flags */
    List       *force_notnull;    /* list of column names */
    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
    List       *force_null;        /* list of column names */
    bool       *force_null_flags;    /* per-column CSV FN flags */
    bool        convert_selectively;    /* do selective binary conversion? */
    List       *convert_select; /* list of column names (can be NIL) */
    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */

    /* these are just for error messages, see CopyFromErrorCallback */
    const char *cur_relname;    /* table name for error messages */
    int            cur_lineno;        /* line number for error messages */
    const char *cur_attname;    /* current att for error messages */
    const char *cur_attval;        /* current att value for error messages */

    /*
     * Working state for COPY TO/FROM
     */
    MemoryContext copycontext;    /* per-copy execution context */

    /*
     * Working state for COPY TO
     */
    FmgrInfo   *out_functions;    /* lookup info for output functions */
    MemoryContext rowcontext;    /* per-row evaluation context */

    /*
     * Working state for COPY FROM
     */
    AttrNumber    num_defaults;
    bool        file_has_oids;
    FmgrInfo    oid_in_function;
    Oid            oid_typioparam;
    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
    Oid           *typioparams;    /* array of element types for in_functions */
    int           *defmap;            /* array of default att numbers */
    ExprState **defexprs;        /* array of default att expressions */
    bool        volatile_defexprs;    /* is any of defexprs volatile? */
    List       *range_table;

	/* Tuple-routing support info */
	PartitionTupleRouting *partition_tuple_routing;

    TransitionCaptureState *transition_capture;

    /*
     * These variables are used to reduce overhead in textual COPY FROM.
     *
     * attribute_buf holds the separated, de-escaped text for each field of
     * the current line.  The CopyReadAttributes functions return arrays of
     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
     * the buffer on each cycle.
     */
    StringInfoData attribute_buf;

    /* field raw data pointers found by COPY FROM */

    int            max_fields;
    char      **raw_fields;

    /*
     * Similarly, line_buf holds the whole input line being processed. The
     * input cycle is first to read the whole line into line_buf, convert it
     * to server encoding there, and then extract the individual attribute
     * fields into attribute_buf.  line_buf is preserved unmodified so that we
     * can display it in error messages if appropriate.
     */
    StringInfoData line_buf;
    bool        line_buf_converted; /* converted to server encoding? */
    bool        line_buf_valid; /* contains the row being processed? */

    /*
     * Finally, raw_buf holds raw data read from the data source (file or
     * client connection).  CopyReadLine parses this data sufficiently to
     * locate line boundaries, then transfers the data to line_buf and
     * converts it.  Note: we guarantee that there is a \0 at
     * raw_buf[raw_buf_len].
     */
#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
    char       *raw_buf;
    int            raw_buf_index;    /* next byte to process */
    int            raw_buf_len;    /* total # of bytes stored */
#ifdef PGXC
    /* Remote COPY state data */
    RemoteCopyData *remoteCopyState;
#endif

#ifdef _SHARDING_
    Bitmapset * shard_array;
#endif
#ifdef __OPENTENBASE__
    Relation    *partrels;
    int            nparts;
    bool        insert_into;
    int        data_index;
    char       ***data_list;
    int        data_ncolumns;
    int        ndatarows;
    bool       whole_line;
#endif
} CopyStateData;

/* DestReceiver for COPY (query) TO */
typedef struct
{
    DestReceiver pub;            /* publicly-known function pointers */
    CopyState    cstate;            /* CopyStateData for the command */
    uint64        processed;        /* # of tuples processed */
} DR_copy;


/*
 * These macros centralize code used to process line_buf and raw_buf buffers.
 * They are macros because they often do continue/break control and to avoid
 * function call overhead in tight COPY loops.
 *
 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
 * prevent the continue/break processing from working.  We end the "if (1)"
 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
 * any "else" in the calling code, and to avoid any compiler warnings about
 * empty statements.  See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
 */

/*
 * This keeps the character read at the top of the loop in the buffer
 * even if there is more than one read-ahead.
 */
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
if (1) \
{ \
    if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
    { \
        raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
        need_data = true; \
        continue; \
    } \
} else ((void) 0)

/* This consumes the remainder of the buffer and breaks */
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
if (1) \
{ \
    if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
    { \
        if (extralen) \
            raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
        /* backslash just before EOF, treat as data char */ \
        result = true; \
        break; \
    } \
} else ((void) 0)

/*
 * Transfer any approved data to line_buf; must do this to be sure
 * there is some room in raw_buf.
 */
#define REFILL_LINEBUF \
if (1) \
{ \
    if (raw_buf_ptr > cstate->raw_buf_index) \
    { \
        appendBinaryStringInfo(&cstate->line_buf, \
                             cstate->raw_buf + cstate->raw_buf_index, \
                               raw_buf_ptr - cstate->raw_buf_index); \
        cstate->raw_buf_index = raw_buf_ptr; \
    } \
} else ((void) 0)

/* Undo any read-ahead and jump out of the block. */
#define NO_END_OF_COPY_GOTO \
if (1) \
{ \
    raw_buf_ptr = prev_raw_ptr + 1; \
    goto not_end_of_copy; \
} else ((void) 0)

static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";


/* non-export function prototypes */
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
          RawStmt *raw_query, Oid queryRelId, List *attnamelist,
          List *options,
#ifdef _SHARDING_
          Bitmapset * shards
#endif
            );
static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
            Oid queryRelId, const char *filename, bool is_program,
            List *attnamelist, List *options,
#ifdef _SHARDING_
            Bitmapset * shards
#endif    
            );
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
             Datum *values, bool *nulls);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
                    CommandId mycid, int hi_options,
                    ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
                    BulkInsertState bistate,
                    int nBufferedTuples, HeapTuple *bufferedTuples,
                    int firstBufferedLineNo);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
#ifdef __OPENTENBASE__
static int CopyReadAttributesInternal(CopyState cstate);
#endif
static int    CopyReadAttributesText(CopyState cstate);
static int    CopyReadAttributesCSV(CopyState cstate);
static Datum CopyReadBinaryAttribute(CopyState cstate,
                        int column_no, FmgrInfo *flinfo,
                        Oid typioparam, int32 typmod,
                        bool *isnull);
static void CopyAttributeOutText(CopyState cstate, char *string);
static void CopyAttributeOutCSV(CopyState cstate, char *string,
                    bool use_quote, bool single_attr);
static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
               List *attnamelist);
static char *limit_printout_length(const char *str);

/* Low-level communications functions */
static void SendCopyBegin(CopyState cstate);
static void ReceiveCopyBegin(CopyState cstate);
static void SendCopyEnd(CopyState cstate);
static void CopySendData(CopyState cstate, const void *databuf, int datasize);
static void CopySendString(CopyState cstate, const char *str);
static void CopySendChar(CopyState cstate, char c);
static void CopySendEndOfRow(CopyState cstate);
static int CopyGetData(CopyState cstate, void *databuf,
            int minread, int maxread);
static void CopySendInt32(CopyState cstate, int32 val);
static bool CopyGetInt32(CopyState cstate, int32 *val);
static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);


#ifdef PGXC
static RemoteCopyOptions *GetRemoteCopyOptions(CopyState cstate);
static void append_defvals(Datum *values, CopyState cstate);
#endif

/*
 * Send copy start/stop messages for frontend copies.  These have changed
 * in past protocol redesigns.
 */
static void
SendCopyBegin(CopyState cstate)
{
    if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
    {
        /* new way */
        StringInfoData buf;
        int            natts = list_length(cstate->attnumlist);
        int16        format = (cstate->binary ? 1 : 0);
        int            i;

        pq_beginmessage(&buf, 'H');
        pq_sendbyte(&buf, format);    /* overall format */
        pq_sendint(&buf, natts, 2);
        for (i = 0; i < natts; i++)
            pq_sendint(&buf, format, 2);    /* per-column formats */
        pq_endmessage(&buf);
        cstate->copy_dest = COPY_NEW_FE;
    }
    else
    {
        /* old way */
        if (cstate->binary)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("COPY BINARY is not supported to stdout or from stdin")));
        pq_putemptymessage('H');
        /* grottiness needed for old COPY OUT protocol */
        pq_startcopyout();
        cstate->copy_dest = COPY_OLD_FE;
    }
}

static void
ReceiveCopyBegin(CopyState cstate)
{
    if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
    {
        /* new way */
        StringInfoData buf;
        int            natts = list_length(cstate->attnumlist);
        int16        format = (cstate->binary ? 1 : 0);
        int            i;

        pq_beginmessage(&buf, 'G');
        pq_sendbyte(&buf, format);    /* overall format */
        pq_sendint(&buf, natts, 2);
        for (i = 0; i < natts; i++)
            pq_sendint(&buf, format, 2);    /* per-column formats */
        pq_endmessage(&buf);
        cstate->copy_dest = COPY_NEW_FE;
        cstate->fe_msgbuf = makeStringInfo();
    }
    else
    {
        /* old way */
        if (cstate->binary)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("COPY BINARY is not supported to stdout or from stdin")));
        pq_putemptymessage('G');
        /* any error in old protocol will make us lose sync */
        pq_startmsgread();
        cstate->copy_dest = COPY_OLD_FE;
    }
    /* We *must* flush here to ensure FE knows it can send. */
    pq_flush();
}

static void
SendCopyEnd(CopyState cstate)
{
    if (cstate->copy_dest == COPY_NEW_FE)
    {
        /* Shouldn't have any unsent data */
        Assert(cstate->fe_msgbuf->len == 0);
        /* Send Copy Done message */
        pq_putemptymessage('c');
    }
    else
    {
        CopySendData(cstate, "\\.", 2);
        /* Need to flush out the trailer (this also appends a newline) */
        CopySendEndOfRow(cstate);
        pq_endcopyout(false);
    }
}

/*----------
 * CopySendData sends output data to the destination (file or frontend)
 * CopySendString does the same for null-terminated strings
 * CopySendChar does the same for single characters
 * CopySendEndOfRow does the appropriate thing at end of each data row
 *    (data is not actually flushed except by CopySendEndOfRow)
 *
 * NB: no data conversion is applied by these functions
 *----------
 */
static void
CopySendData(CopyState cstate, const void *databuf, int datasize)
{
    appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
}

static void
CopySendString(CopyState cstate, const char *str)
{
    appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
}

static void
CopySendChar(CopyState cstate, char c)
{
    appendStringInfoCharMacro(cstate->fe_msgbuf, c);
}

static void
CopySendEndOfRow(CopyState cstate)
{// #lizard forgives
    StringInfo    fe_msgbuf = cstate->fe_msgbuf;

    switch (cstate->copy_dest)
    {
        case COPY_FILE:
            if (!cstate->binary)
            {
                /* Default line termination depends on platform */
#ifndef WIN32
                CopySendChar(cstate, '\n');
#else
                CopySendString(cstate, "\r\n");
#endif
            }

            if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                       cstate->copy_file) != 1 ||
                ferror(cstate->copy_file))
            {
                if (cstate->is_program)
                {
                    if (errno == EPIPE)
                    {
                        /*
                         * The pipe will be closed automatically on error at
                         * the end of transaction, but we might get a better
                         * error message from the subprocess' exit code than
                         * just "Broken Pipe"
                         */
                        ClosePipeToProgram(cstate);

                        /*
                         * If ClosePipeToProgram() didn't throw an error, the
                         * program terminated normally, but closed the pipe
                         * first. Restore errno, and throw an error.
                         */
                        errno = EPIPE;
                    }
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("could not write to COPY program: %m")));
                }
                else
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("could not write to COPY file: %m")));
            }
            break;
        case COPY_OLD_FE:
            /* The FE/BE protocol uses \n as newline for all platforms */
            if (!cstate->binary)
                CopySendChar(cstate, '\n');

            if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
            {
                /* no hope of recovering connection sync, so FATAL */
                ereport(FATAL,
                        (errcode(ERRCODE_CONNECTION_FAILURE),
                         errmsg("connection lost during COPY to stdout")));
            }
            break;
        case COPY_NEW_FE:
            /* The FE/BE protocol uses \n as newline for all platforms */
            if (!cstate->binary)
                CopySendChar(cstate, '\n');

            /* Dump the accumulated row as one CopyData message */
            (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
            break;
#ifdef PGXC
        case COPY_BUFFER:
            /* Do not send yet anywhere, just return */
            return;
#endif
        case COPY_CALLBACK:
            Assert(false);        /* Not yet supported. */
            break;
    }

    resetStringInfo(fe_msgbuf);
}

/*
 * CopyGetData reads data from the source (file or frontend)
 *
 * We attempt to read at least minread, and at most maxread, bytes from
 * the source.  The actual number of bytes read is returned; if this is
 * less than minread, EOF was detected.
 *
 * Note: when copying from the frontend, we expect a proper EOF mark per
 * protocol; if the frontend simply drops the connection, we raise error.
 * It seems unwise to allow the COPY IN to complete normally in that case.
 *
 * NB: no data conversion is applied here.
 */
static int
CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
{// #lizard forgives
    int            bytesread = 0;
#ifdef __OPENTENBASE__
    cstate->whole_line = false;
#endif

    switch (cstate->copy_dest)
    {
        case COPY_FILE:
            bytesread = fread(databuf, 1, maxread, cstate->copy_file);
            if (ferror(cstate->copy_file))
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not read from COPY file: %m")));
            break;
        case COPY_OLD_FE:

            /*
             * We cannot read more than minread bytes (which in practice is 1)
             * because old protocol doesn't have any clear way of separating
             * the COPY stream from following data.  This is slow, but not any
             * slower than the code path was originally, and we don't care
             * much anymore about the performance of old protocol.
             */
            if (pq_getbytes((char *) databuf, minread))
            {
                /* Only a \. terminator is legal EOF in old protocol */
                ereport(ERROR,
                        (errcode(ERRCODE_CONNECTION_FAILURE),
                         errmsg("unexpected EOF on client connection with an open transaction")));
            }
            bytesread = minread;
            break;
        case COPY_NEW_FE:
            while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
            {
                int            avail;

                while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
                {
                    /* Try to receive another message */
                    int            mtype;

            readmessage:
                    HOLD_CANCEL_INTERRUPTS();
                    pq_startmsgread();
                    mtype = pq_getbyte();
                    if (mtype == EOF)
                        ereport(ERROR,
                                (errcode(ERRCODE_CONNECTION_FAILURE),
                                 errmsg("unexpected EOF on client connection with an open transaction")));
                    if (pq_getmessage(cstate->fe_msgbuf, 0))
                        ereport(ERROR,
                                (errcode(ERRCODE_CONNECTION_FAILURE),
                                 errmsg("unexpected EOF on client connection with an open transaction")));
                    RESUME_CANCEL_INTERRUPTS();
                    switch (mtype)
                    {
                        case 'd':    /* CopyData */
                            break;
                        case 'c':    /* CopyDone */
                            /* COPY IN correctly terminated by frontend */
                            cstate->fe_eof = true;
                            return bytesread;
                        case 'f':    /* CopyFail */
                            if (g_enable_copy_silence)
                            {
                                ereport(FATAL,
                                    (errcode(ERRCODE_QUERY_CANCELED),
                                     errmsg("receive copyfail from cn, msg:%s",
                                            pq_getmsgstring(cstate->fe_msgbuf))));
                            }
                            else
                            {
                                ereport(ERROR,
                                    (errcode(ERRCODE_QUERY_CANCELED),
                                     errmsg("COPY from stdin failed: %s",
                                            pq_getmsgstring(cstate->fe_msgbuf))));
                            }
                            break;
                        case 'H':    /* Flush */
                        case 'S':    /* Sync */

                            /*
                             * Ignore Flush/Sync for the convenience of client
                             * libraries (such as libpq) that may send those
                             * without noticing that the command they just
                             * sent was COPY.
                             */
                            goto readmessage;
                        default:
                            ereport(ERROR,
                                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                     errmsg("unexpected message type 0x%02X during COPY from stdin",
                                            mtype)));
                            break;
                    }
                }
                avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
#ifdef __OPENTENBASE__
                if (avail > maxread)
                {
                    cstate->whole_line = false;
                }
                else
                {
                    cstate->whole_line = true;
                }
#endif
                if (avail > maxread)
                    avail = maxread;
                pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
                databuf = (void *) ((char *) databuf + avail);
                maxread -= avail;
                bytesread += avail;
            }
            break;
#ifdef PGXC
        case COPY_BUFFER:
            elog(ERROR, "COPY_BUFFER not allowed in this context");
            break;
#endif
        case COPY_CALLBACK:
            bytesread = cstate->data_source_cb(databuf, minread, maxread);
            break;
    }

    return bytesread;
}


/*
 * These functions do apply some data conversion
 */

/*
 * CopySendInt32 sends an int32 in network byte order
 */
static void
CopySendInt32(CopyState cstate, int32 val)
{
    uint32        buf;

    buf = htonl((uint32) val);
    CopySendData(cstate, &buf, sizeof(buf));
}

/*
 * CopyGetInt32 reads an int32 that appears in network byte order
 *
 * Returns true if OK, false if EOF
 */
static bool
CopyGetInt32(CopyState cstate, int32 *val)
{
    uint32        buf;

    if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
    {
        *val = 0;                /* suppress compiler warning */
        return false;
    }
    *val = (int32) ntohl(buf);
    return true;
}

/*
 * CopySendInt16 sends an int16 in network byte order
 */
static void
CopySendInt16(CopyState cstate, int16 val)
{
    uint16        buf;

    buf = htons((uint16) val);
    CopySendData(cstate, &buf, sizeof(buf));
}

/*
 * CopyGetInt16 reads an int16 that appears in network byte order
 */
static bool
CopyGetInt16(CopyState cstate, int16 *val)
{
    uint16        buf;

    if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
    {
        *val = 0;                /* suppress compiler warning */
        return false;
    }
    *val = (int16) ntohs(buf);
    return true;
}

/*
 * CopyLoadRawBuf loads some more data into raw_buf
 *
 * Returns TRUE if able to obtain at least one more byte, else FALSE.
 *
 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
 * down to the start of the buffer and then we load more data after that.
 * This case is used only when a frontend multibyte character crosses a
 * bufferload boundary.
 */
static bool
CopyLoadRawBuf(CopyState cstate)
{
    int            nbytes;
    int            inbytes;

    if (cstate->raw_buf_index < cstate->raw_buf_len)
    {
        /* Copy down the unprocessed data */
        nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
        memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
                nbytes);
    }
    else
        nbytes = 0;                /* no data need be saved */

    inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
                          1, RAW_BUF_SIZE - nbytes);
    nbytes += inbytes;
    cstate->raw_buf[nbytes] = '\0';
    cstate->raw_buf_index = 0;
    cstate->raw_buf_len = nbytes;
    return (inbytes > 0);
}


/*
 *     DoCopy executes the SQL COPY statement
 *
 * Either unload or reload contents of table <relation>, depending on <from>.
 * (<from> = TRUE means we are inserting into the table.)  In the "TO" case
 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
 * or DELETE query.
 *
 * If <pipe> is false, transfer is between the table and the file named
 * <filename>.  Otherwise, transfer is between the table and our regular
 * input/output stream. The latter could be either stdin/stdout or a
 * socket, depending on whether we're running under Postmaster control.
 *
 * Do not allow a Postgres user without superuser privilege to read from
 * or write to a file.
 *
 * Do not allow the copy if user doesn't have proper permission to access
 * the table or the specifically requested columns.
 */
void
DoCopy(ParseState *pstate, const CopyStmt *stmt,
       int stmt_location, int stmt_len,
       uint64 *processed)
{// #lizard forgives
    CopyState    cstate;
    bool        is_from = stmt->is_from;
    bool        pipe = (stmt->filename == NULL);
    Relation    rel;
    Oid            relid;
    RawStmt    *query = NULL;

    /* Disallow COPY to/from file or program except to superusers. */
    if (!pipe && !superuser())
    {
        if (stmt->is_program)
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("must be superuser to COPY to or from an external program"),
                     errhint("Anyone can COPY to stdout or from stdin. "
                             "psql's \\copy command also works for anyone.")));
        else
#ifdef __OPENTENBASE__
        {
            /* 
             * if transformed from insert into multi-values, 
             * non-superuser permitted to do copy 
             */
            if (!stmt->insert_into)
            {
#endif
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("must be superuser to COPY to or from a file"),
                     errhint("Anyone can COPY to stdout or from stdin. "
                             "psql's \\copy command also works for anyone.")));
#ifdef __OPENTENBASE__
            }
        }
#endif
    }

#ifdef __OPENTENBASE__
    pstate->stmt = stmt;
#endif

    if (stmt->relation)
    {
        TupleDesc    tupDesc;
        List       *attnums;
        ListCell   *cur;
        RangeTblEntry *rte;

        Assert(!stmt->query);

        /* Open and lock the relation, using the appropriate lock type. */
        rel = heap_openrv(stmt->relation,
                          (is_from ? RowExclusiveLock : AccessShareLock));

        relid = RelationGetRelid(rel);

        rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
        rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);

        tupDesc = RelationGetDescr(rel);
        attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
        foreach(cur, attnums)
        {
            int            attno = lfirst_int(cur) -
            FirstLowInvalidHeapAttributeNumber;

            if (is_from)
                rte->insertedCols = bms_add_member(rte->insertedCols, attno);
            else
                rte->selectedCols = bms_add_member(rte->selectedCols, attno);
        }
        ExecCheckRTPerms(pstate->p_rtable, true);

        /*
         * Permission check for row security policies.
         *
         * check_enable_rls will ereport(ERROR) if the user has requested
         * something invalid and will otherwise indicate if we should enable
         * RLS (returns RLS_ENABLED) or not for this COPY statement.
         *
         * If the relation has a row security policy and we are to apply it
         * then perform a "query" copy and allow the normal query processing
         * to handle the policies.
         *
         * If RLS is not enabled for this, then just fall through to the
         * normal non-filtering relation handling.
         */
        if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
        {
            SelectStmt *select;
            ColumnRef  *cr;
            ResTarget  *target;
            RangeVar   *from;
            List       *targetList = NIL;

            if (is_from)
                ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("COPY FROM not supported with row-level security"),
                         errhint("Use INSERT statements instead.")));

            /*
             * Build target list
             *
             * If no columns are specified in the attribute list of the COPY
             * command, then the target list is 'all' columns. Therefore, '*'
             * should be used as the target list for the resulting SELECT
             * statement.
             *
             * In the case that columns are specified in the attribute list,
             * create a ColumnRef and ResTarget for each column and add them
             * to the target list for the resulting SELECT statement.
             */
            if (!stmt->attlist)
            {
                cr = makeNode(ColumnRef);
                cr->fields = list_make1(makeNode(A_Star));
                cr->location = -1;

                target = makeNode(ResTarget);
                target->name = NULL;
                target->indirection = NIL;
                target->val = (Node *) cr;
                target->location = -1;

                targetList = list_make1(target);
            }
            else
            {
                ListCell   *lc;

                foreach(lc, stmt->attlist)
                {
                    /*
                     * Build the ColumnRef for each column.  The ColumnRef
                     * 'fields' property is a String 'Value' node (see
                     * nodes/value.h) that corresponds to the column name
                     * respectively.
                     */
                    cr = makeNode(ColumnRef);
                    cr->fields = list_make1(lfirst(lc));
                    cr->location = -1;

                    /* Build the ResTarget and add the ColumnRef to it. */
                    target = makeNode(ResTarget);
                    target->name = NULL;
                    target->indirection = NIL;
                    target->val = (Node *) cr;
                    target->location = -1;

                    /* Add each column to the SELECT statement's target list */
                    targetList = lappend(targetList, target);
                }
            }

            /*
             * Build RangeVar for from clause, fully qualified based on the
             * relation which we have opened and locked.
             */
            from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
                                pstrdup(RelationGetRelationName(rel)),
                                -1);

            /* Build query */
            select = makeNode(SelectStmt);
            select->targetList = targetList;
            select->fromClause = list_make1(from);

            query = makeNode(RawStmt);
            query->stmt = (Node *) select;
            query->stmt_location = stmt_location;
            query->stmt_len = stmt_len;

            /*
             * Close the relation for now, but keep the lock on it to prevent
             * changes between now and when we start the query-based COPY.
             *
             * We'll reopen it later as part of the query-based COPY.
             */
            heap_close(rel, NoLock);
            rel = NULL;
        }
    }
    else
    {
        Assert(stmt->query);

        query = makeNode(RawStmt);
        query->stmt = stmt->query;
        query->stmt_location = stmt_location;
        query->stmt_len = stmt_len;

        relid = InvalidOid;
        rel = NULL;
    }

    if (is_from)
    {
        Assert(rel);

        /* check read-only transaction,gtm read-only flag and parallel mode */
        if ((XactReadOnly  || GTM_ReadOnly) && (rel && !rel->rd_islocaltemp))
            PreventCommandIfReadOnly("COPY FROM");
        PreventCommandIfParallelMode("COPY FROM");

        cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
                               NULL, stmt->attlist, stmt->options);
        *processed = CopyFrom(cstate);    /* copy from file to database */
#ifdef XCP
        /*
         * We should record insert to distributed table.
         * Bulk inserts into local tables are recorded when heap tuples are
         * written.
         */
        if (IS_PGXC_COORDINATOR && rel->rd_locator_info)
            pgstat_count_remote_insert(rel, (int) *processed);
#endif
        EndCopyFrom(cstate);
    }
    else
    {
#ifdef _SHARDING_
        Bitmapset * bms_shards = NULL;

        if(rel && !RelationIsSharded(rel) && stmt->shards)
        {            
            elog(ERROR, "relation %d is not a sharded table, so it can not be export by specifed shardid.", 
                        RelationGetRelid(rel));
        }
        
        if(stmt->shards)
        {
            ListCell *lc;
            int shardid;
            
            foreach(lc,stmt->shards)
            {
                A_Const * cons = (A_Const *)lfirst(lc);
                if(cons->val.type != T_Integer)
                    elog(ERROR, "shard must be a integer");
                
                shardid = intVal(&cons->val);

                if(!ShardIDIsValid(shardid))
                    elog(ERROR, "shardid %d is invalid.", shardid);
                bms_shards = bms_add_member(bms_shards, shardid);
            }
        }
#endif
        cstate = BeginCopyTo(pstate, rel, query, relid,
                             stmt->filename, stmt->is_program,
                             stmt->attlist, stmt->options,
#ifdef _SHARDING_
                            bms_shards
#endif                             
                             );

        *processed = DoCopyTo(cstate);    /* copy from database to file */
#ifdef _SHARDING_
        if (cstate->shard_array)
        {
            bms_free(cstate->shard_array);
        }
#endif

#ifdef __STORAGE_SCALABLE__
        if (am_walsender && !AmOpenTenBaseSubscriptionWalSender())
        {
            Assert(MyReplicationSlot);
            if (MyReplicationSlot)
            {
                if (OidIsValid(MyReplicationSlot->subid) && OidIsValid(MyReplicationSlot->relid))
                {
                    if (MyReplicationSlot->relid == RelationGetRelid(rel))
                    {
                        UpdatePubStatistics(MyReplicationSlot->subname.data, *processed, 0, 0, 0, 0, false);
                        UpdatePubTableStatistics(MyReplicationSlot->subid, MyReplicationSlot->relid, *processed, 0, 0, 0, 0, false);
                    }
                }
            }
        }
#endif
        
        EndCopyTo(cstate);
    }

    /*
     * Close the relation. If reading, we can release the AccessShareLock we
     * got; if writing, we should hold the lock until end of transaction to
     * ensure that updates will be committed before lock is released.
     */
    if (rel != NULL)
        heap_close(rel, (is_from ? NoLock : AccessShareLock));
}

/*
 * Process the statement option list for COPY.
 *
 * Scan the options list (a list of DefElem) and transpose the information
 * into cstate, applying appropriate error checking.
 *
 * cstate is assumed to be filled with zeroes initially.
 *
 * This is exported so that external users of the COPY API can sanity-check
 * a list of options.  In that usage, cstate should be passed as NULL
 * (since external users don't know sizeof(CopyStateData)) and the collected
 * data is just leaked until CurrentMemoryContext is reset.
 *
 * Note that additional checking, such as whether column names listed in FORCE
 * QUOTE actually exist, has to be applied later.  This just checks for
 * self-consistency of the options list.
 */
void
ProcessCopyOptions(ParseState *pstate,
                   CopyState cstate,
                   bool is_from,
                   List *options)
{// #lizard forgives
    bool        format_specified = false;
    ListCell   *option;

    /* Support external use for option sanity checking */
    if (cstate == NULL)
        cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));

    cstate->file_encoding = -1;

    /* Extract options from the statement node tree */
    foreach(option, options)
    {
        DefElem    *defel = lfirst_node(DefElem, option);

        if (strcmp(defel->defname, "format") == 0)
        {
            char       *fmt = defGetString(defel);

            if (format_specified)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            format_specified = true;
            if (strcmp(fmt, "text") == 0)
                 /* default format */ ;
            else if (strcmp(fmt, "csv") == 0)
                cstate->csv_mode = true;
            else if (strcmp(fmt, "binary") == 0)
                cstate->binary = true;
#ifdef __OPENTENBASE__
            else if (strcmp(fmt, "internal") == 0)
            {
                cstate->internal_mode = true;
                if (IS_PGXC_COORDINATOR)
                {
                    elog(ERROR, "COPY format \"%s\" not recognized", fmt);
                }
            }
#endif
            else
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("COPY format \"%s\" not recognized", fmt),
                         parser_errposition(pstate, defel->location)));
        }
        else if (strcmp(defel->defname, "oids") == 0)
        {
            if (cstate->oids)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->oids = defGetBoolean(defel);
        }
        else if (strcmp(defel->defname, "freeze") == 0)
        {
            if (cstate->freeze)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->freeze = defGetBoolean(defel);
        }
        else if (strcmp(defel->defname, "delimiter") == 0)
        {
            if (cstate->delim)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->delim = defGetString(defel);
        }
        else if (strcmp(defel->defname, "null") == 0)
        {
            if (cstate->null_print)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->null_print = defGetString(defel);
        }
        else if (strcmp(defel->defname, "header") == 0)
        {
            if (cstate->header_line)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->header_line = defGetBoolean(defel);
        }
        else if (strcmp(defel->defname, "quote") == 0)
        {
            if (cstate->quote)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->quote = defGetString(defel);
        }
        else if (strcmp(defel->defname, "escape") == 0)
        {
            if (cstate->escape)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->escape = defGetString(defel);
        }
        else if (strcmp(defel->defname, "force_quote") == 0)
        {
            if (cstate->force_quote || cstate->force_quote_all)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            if (defel->arg && IsA(defel->arg, A_Star))
                cstate->force_quote_all = true;
            else if (defel->arg && IsA(defel->arg, List))
                cstate->force_quote = castNode(List, defel->arg);
            else
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("argument to option \"%s\" must be a list of column names",
                                defel->defname),
                         parser_errposition(pstate, defel->location)));
        }
        else if (strcmp(defel->defname, "force_not_null") == 0)
        {
            if (cstate->force_notnull)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            if (defel->arg && IsA(defel->arg, List))
                cstate->force_notnull = castNode(List, defel->arg);
            else
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("argument to option \"%s\" must be a list of column names",
                                defel->defname),
                         parser_errposition(pstate, defel->location)));
        }
        else if (strcmp(defel->defname, "force_null") == 0)
        {
            if (cstate->force_null)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
            if (defel->arg && IsA(defel->arg, List))
                cstate->force_null = castNode(List, defel->arg);
            else
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("argument to option \"%s\" must be a list of column names",
                                defel->defname),
                         parser_errposition(pstate, defel->location)));
        }
        else if (strcmp(defel->defname, "convert_selectively") == 0)
        {
            /*
             * Undocumented, not-accessible-from-SQL option: convert only the
             * named columns to binary form, storing the rest as NULLs. It's
             * allowed for the column list to be NIL.
             */
            if (cstate->convert_selectively)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->convert_selectively = true;
            if (defel->arg == NULL || IsA(defel->arg, List))
                cstate->convert_select = castNode(List, defel->arg);
            else
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("argument to option \"%s\" must be a list of column names",
                                defel->defname),
                         parser_errposition(pstate, defel->location)));
        }
        else if (strcmp(defel->defname, "encoding") == 0)
        {
            if (cstate->file_encoding >= 0)
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options"),
                         parser_errposition(pstate, defel->location)));
            cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
            if (cstate->file_encoding < 0)
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("argument to option \"%s\" must be a valid encoding name",
                                defel->defname),
                         parser_errposition(pstate, defel->location)));
        }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
                     errmsg("option \"%s\" not recognized",
                            defel->defname),
                     parser_errposition(pstate, defel->location)));
    }

    /*
     * Check for incompatible options (must do these two before inserting
     * defaults)
     */
    if (cstate->binary && cstate->delim)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 errmsg("cannot specify DELIMITER in BINARY mode")));

    if (cstate->binary && cstate->null_print)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 errmsg("cannot specify NULL in BINARY mode")));

    /* Set defaults for omitted options */
    if (!cstate->delim)
        cstate->delim = cstate->csv_mode ? "," : "\t";

    if (!cstate->null_print)
        cstate->null_print = cstate->csv_mode ? "" : "\\N";
    cstate->null_print_len = strlen(cstate->null_print);

    if (cstate->csv_mode)
    {
        if (!cstate->quote)
            cstate->quote = "\"";
        if (!cstate->escape)
            cstate->escape = cstate->quote;
    }

    /* Only single-byte delimiter strings are supported. */
    if (strlen(cstate->delim) != 1)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY delimiter must be a single one-byte character")));

    /* Disallow end-of-line characters */
    if (strchr(cstate->delim, '\r') != NULL ||
        strchr(cstate->delim, '\n') != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("COPY delimiter cannot be newline or carriage return")));

    if (strchr(cstate->null_print, '\r') != NULL ||
        strchr(cstate->null_print, '\n') != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("COPY null representation cannot use newline or carriage return")));

    /*
     * Disallow unsafe delimiter characters in non-CSV mode.  We can't allow
     * backslash because it would be ambiguous.  We can't allow the other
     * cases because data characters matching the delimiter must be
     * backslashed, and certain backslash combinations are interpreted
     * non-literally by COPY IN.  Disallowing all lower case ASCII letters is
     * more than strictly necessary, but seems best for consistency and
     * future-proofing.  Likewise we disallow all digits though only octal
     * digits are actually dangerous.
     */
    if (!cstate->csv_mode &&
        strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
               cstate->delim[0]) != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));

    /* Check header */
    if (!cstate->csv_mode && cstate->header_line)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY HEADER available only in CSV mode")));

    /* Check quote */
    if (!cstate->csv_mode && cstate->quote != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY quote available only in CSV mode")));

    if (cstate->csv_mode && strlen(cstate->quote) != 1)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY quote must be a single one-byte character")));

    if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("COPY delimiter and quote must be different")));

    /* Check escape */
    if (!cstate->csv_mode && cstate->escape != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY escape available only in CSV mode")));

    if (cstate->csv_mode && strlen(cstate->escape) != 1)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY escape must be a single one-byte character")));

    /* Check force_quote */
    if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force quote available only in CSV mode")));
    if ((cstate->force_quote || cstate->force_quote_all) && is_from)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force quote only available using COPY TO")));

    /* Check force_notnull */
    if (!cstate->csv_mode && cstate->force_notnull != NIL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force not null available only in CSV mode")));
    if (cstate->force_notnull != NIL && !is_from)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force not null only available using COPY FROM")));

    /* Check force_null */
    if (!cstate->csv_mode && cstate->force_null != NIL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force null available only in CSV mode")));

    if (cstate->force_null != NIL && !is_from)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY force null only available using COPY FROM")));

    /* Don't allow the delimiter to appear in the null string. */
    if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("COPY delimiter must not appear in the NULL specification")));

    /* Don't allow the CSV quote char to appear in the null string. */
    if (cstate->csv_mode &&
        strchr(cstate->null_print, cstate->quote[0]) != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("CSV quote character must not appear in the NULL specification")));
}

/*
 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
 *
 * Iff <binary>, unload or reload in the binary format, as opposed to the
 * more wasteful but more robust and portable text format.
 *
 * Iff <oids>, unload or reload the format that includes OID information.
 * On input, we accept OIDs whether or not the table has an OID column,
 * but silently drop them if it does not.  On output, we report an error
 * if the user asks for OIDs in a table that has none (not providing an
 * OID column might seem friendlier, but could seriously confuse programs).
 *
 * If in the text format, delimit columns with delimiter <delim> and print
 * NULL values as <null_print>.
 */
static CopyState
BeginCopy(ParseState *pstate,
          bool is_from,
          Relation rel,
          RawStmt *raw_query,
          Oid queryRelId,
          List *attnamelist,
          List *options,
#ifdef _SHARDING_
          Bitmapset * shards
#endif
)
{// #lizard forgives
    CopyState    cstate;
    TupleDesc    tupDesc;
    int            num_phys_attrs;
    MemoryContext oldcontext;

    /* Allocate workspace and zero all fields */
    cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));

    /*
     * We allocate everything used by a cstate in a new memory context. This
     * avoids memory leaks during repeated use of COPY in a query.
     */
    cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
                                                "COPY",
                                                ALLOCSET_DEFAULT_SIZES);

    oldcontext = MemoryContextSwitchTo(cstate->copycontext);

    /* Extract options from the statement node tree */
    ProcessCopyOptions(pstate, cstate, is_from, options);
#ifdef _SHARDING_
    cstate->shard_array = shards;
    cstate->insert_into = false;
#endif

    /* Process the source/target relation or query */
    if (rel)
    {
        Assert(!raw_query);

        cstate->rel = rel;

        tupDesc = RelationGetDescr(cstate->rel);

        /* Don't allow COPY w/ OIDs to or from a table without them */
        if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
            ereport(ERROR,
                    (errcode(ERRCODE_UNDEFINED_COLUMN),
                     errmsg("table \"%s\" does not have OIDs",
                            RelationGetRelationName(cstate->rel))));

        /*
         * If there are any triggers with transition tables on the named
         * relation, we need to be prepared to capture transition tuples.
         */
        cstate->transition_capture = MakeTransitionCaptureState(rel->trigdesc);

        /* Initialize state for CopyFrom tuple routing. */
        if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
        {
            PartitionTupleRouting *proute;

            proute = cstate->partition_tuple_routing =
			ExecSetupPartitionTupleRouting(NULL, cstate->rel);

            /*
             * If we are capturing transition tuples, they may need to be
             * converted from partition format back to partitioned table
             * format (this is only ever necessary if a BEFORE trigger
             * modifies the tuple).
             */
            if (cstate->transition_capture != NULL)
				ExecSetupChildParentMapForLeaf(proute);
        }
#ifdef PGXC
        /* Get copy statement and execution node information */
        if (IS_PGXC_COORDINATOR)
        {
            RemoteCopyData *remoteCopyState = (RemoteCopyData *) palloc0(sizeof(RemoteCopyData));
            List *attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);

            /* Setup correct COPY FROM/TO flag */
            remoteCopyState->is_from = is_from;

            /* Get execution node list */
            RemoteCopy_GetRelationLoc(remoteCopyState,
                                      cstate->rel,
                                      attnums);
#ifdef __OPENTENBASE__
            if (pstate && pstate->stmt && pstate->stmt->insert_into)
            {
                cstate->insert_into = true;
            }
#endif
            /* Build remote query */
            RemoteCopy_BuildStatement(remoteCopyState,
                                      cstate->rel,
                                      GetRemoteCopyOptions(cstate),
                                      attnamelist,
                                      attnums,
#ifdef _SHARDING_
                                        shards
#endif
                                        );

            /* Then assign built structure */
            cstate->remoteCopyState = remoteCopyState;
        }
#endif
    }
    else
    {
        List       *rewritten;
        Query       *query;
        PlannedStmt *plan;
        DestReceiver *dest;

        Assert(!is_from);
        cstate->rel = NULL;

        /* Don't allow COPY w/ OIDs from a query */
        if (cstate->oids)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("COPY (query) WITH OIDS is not supported")));

        /*
         * The grammar allows SELECT INTO, but we don't support that. We do
         * this check before transforming the query since QueryRewriteCTAS will
         * transform the statemet into a CREATE TABLE followed by INSERT INTO.
         * The CREATE TABLE is processed separately by QueryRewriteCTAS and
         * what we get back is just the INSERT statement. Not doing a check
         * will ultimately lead to an error, but doing it here allows us to
         * throw a more friendly and PG-compatible error.
         */
        if (IsA(raw_query->stmt, SelectStmt))
        {
            SelectStmt *stmt = (SelectStmt *) raw_query->stmt;

            /*
             * If it's a set-operation tree, drilldown to leftmost SelectStmt
             */
            while (stmt && stmt->op != SETOP_NONE)
                stmt = stmt->larg;
            Assert(stmt && IsA(stmt, SelectStmt) && stmt->larg == NULL);

            if (stmt && stmt->intoClause)
            {
                ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("COPY (SELECT INTO) is not supported")));
            }
        }

        /*
         * Run parse analysis and rewrite.  Note this also acquires sufficient
         * locks on the source table(s).
         *
         * Because the parser and planner tend to scribble on their input, we
         * make a preliminary copy of the source querytree.  This prevents
         * problems in the case that the COPY is in a portal or plpgsql
         * function and is executed repeatedly.  (See also the same hack in
         * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
         */
        rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
                                           pstate->p_sourcetext, NULL, 0,
                                           NULL);

        /* check that we got back something we can work with */
        if (rewritten == NIL)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
        }
        else if (list_length(rewritten) > 1)
        {
            ListCell   *lc;

            /* examine queries to determine which error message to issue */
            foreach(lc, rewritten)
            {
                Query       *q = lfirst_node(Query, lc);

                if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
                    ereport(ERROR,
                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                             errmsg("conditional DO INSTEAD rules are not supported for COPY")));
                if (q->querySource == QSRC_NON_INSTEAD_RULE)
                    ereport(ERROR,
                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                             errmsg("DO ALSO rules are not supported for the COPY")));
            }

            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
        }

        query = linitial_node(Query, rewritten);

        Assert(query->utilityStmt == NULL);

        /*
         * Similarly the grammar doesn't enforce the presence of a RETURNING
         * clause, but this is required here.
         */
        if (query->commandType != CMD_SELECT &&
            query->returningList == NIL)
        {
            Assert(query->commandType == CMD_INSERT ||
                   query->commandType == CMD_UPDATE ||
                   query->commandType == CMD_DELETE);

            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("COPY query must have a RETURNING clause")));
        }

        /* plan the query */
        plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);

        /*
         * With row level security and a user using "COPY relation TO", we
         * have to convert the "COPY relation TO" to a query-based COPY (eg:
         * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
         * in any RLS clauses.
         *
         * When this happens, we are passed in the relid of the originally
         * found relation (which we have locked).  As the planner will look up
         * the relation again, we double-check here to make sure it found the
         * same one that we have locked.
         */
        if (queryRelId != InvalidOid)
        {
            /*
             * Note that with RLS involved there may be multiple relations,
             * and while the one we need is almost certainly first, we don't
             * make any guarantees of that in the planner, so check the whole
             * list and make sure we find the original relation.
             */
            if (!list_member_oid(plan->relationOids, queryRelId))
                ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                         errmsg("relation referenced by COPY statement has changed")));
        }

        /*
         * Use a snapshot with an updated command ID to ensure this query sees
         * results of any previously executed queries.
         */
        PushCopiedSnapshot(GetActiveSnapshot());
        UpdateActiveSnapshotCommandId();

        /* Create dest receiver for COPY OUT */
        dest = CreateDestReceiver(DestCopyOut);
        ((DR_copy *) dest)->cstate = cstate;

        /* Create a QueryDesc requesting no output */
        cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
                                            GetActiveSnapshot(),
                                            InvalidSnapshot,
                                            dest, NULL, NULL, 0);

        /*
         * Call ExecutorStart to prepare the plan for execution.
         *
         * ExecutorStart computes a result tupdesc for us
         */
		if (query->returningList != NIL)
			ExecutorStart(cstate->queryDesc, EXEC_FLAG_RETURNING);
		else
        ExecutorStart(cstate->queryDesc, 0);

        tupDesc = cstate->queryDesc->tupDesc;
    }

    /* Generate or convert list of attributes to process */
    cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);

    num_phys_attrs = tupDesc->natts;

    /* Convert FORCE_QUOTE name list to per-column flags, check validity */
    cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    if (cstate->force_quote_all)
    {
        int            i;

        for (i = 0; i < num_phys_attrs; i++)
            cstate->force_quote_flags[i] = true;
    }
    else if (cstate->force_quote)
    {
        List       *attnums;
        ListCell   *cur;

        attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);

        foreach(cur, attnums)
        {
            int            attnum = lfirst_int(cur);

            if (!list_member_int(cstate->attnumlist, attnum))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                         errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
                                NameStr(tupDesc->attrs[attnum - 1]->attname))));
            cstate->force_quote_flags[attnum - 1] = true;
        }
    }

    /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    if (cstate->force_notnull)
    {
        List       *attnums;
        ListCell   *cur;

        attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);

        foreach(cur, attnums)
        {
            int            attnum = lfirst_int(cur);

            if (!list_member_int(cstate->attnumlist, attnum))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                         errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
                                NameStr(tupDesc->attrs[attnum - 1]->attname))));
            cstate->force_notnull_flags[attnum - 1] = true;
        }
    }

    /* Convert FORCE_NULL name list to per-column flags, check validity */
    cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    if (cstate->force_null)
    {
        List       *attnums;
        ListCell   *cur;

        attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);

        foreach(cur, attnums)
        {
            int            attnum = lfirst_int(cur);

            if (!list_member_int(cstate->attnumlist, attnum))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                         errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
                                NameStr(tupDesc->attrs[attnum - 1]->attname))));
            cstate->force_null_flags[attnum - 1] = true;
        }
    }

    /* Convert convert_selectively name list to per-column flags */
    if (cstate->convert_selectively)
    {
        List       *attnums;
        ListCell   *cur;

        cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));

        attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);

        foreach(cur, attnums)
        {
            int            attnum = lfirst_int(cur);

            if (!list_member_int(cstate->attnumlist, attnum))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                         errmsg_internal("selected column \"%s\" not referenced by COPY",
                                         NameStr(tupDesc->attrs[attnum - 1]->attname))));
            cstate->convert_select_flags[attnum - 1] = true;
        }
    }

    /* Use client encoding when ENCODING option is not specified. */
    if (cstate->file_encoding < 0)
        cstate->file_encoding = pg_get_client_encoding();

    /*
     * Set up encoding conversion info.  Even if the file and server encodings
     * are the same, we must apply pg_any_to_server() to validate data in
     * multibyte encodings.
     */
    cstate->need_transcoding =
        (cstate->file_encoding != GetDatabaseEncoding() ||
         pg_database_encoding_max_length() > 1);
    /* See Multibyte encoding comment above */
    cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);

    cstate->copy_dest = COPY_FILE;    /* default */

#ifdef PGXC
    /*
     * We are here just at copy begin process,
     * so only pick up the list of connections.
     */
    if (IS_PGXC_COORDINATOR)
    {
        RemoteCopyData *remoteCopyState = cstate->remoteCopyState;

        /*
         * In the case of CopyOut, it is just necessary to pick up one node randomly.
         * This is done when rel_loc is found.
         */
        if (remoteCopyState && remoteCopyState->rel_loc)
        {
            DataNodeCopyBegin(remoteCopyState);
            if (!remoteCopyState->locator)
                ereport(ERROR,
                        (errcode(ERRCODE_CONNECTION_EXCEPTION),
                         errmsg("Failed to initialize Datanodes for COPY")));
        }
    }
#endif

    MemoryContextSwitchTo(oldcontext);

    return cstate;
}

/*
 * Closes the pipe to an external program, checking the pclose() return code.
 */
static void
ClosePipeToProgram(CopyState cstate)
{
    int            pclose_rc;

    Assert(cstate->is_program);

    pclose_rc = ClosePipeStream(cstate->copy_file);
    if (pclose_rc == -1)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not close pipe to external command: %m")));
    else if (pclose_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                 errmsg("program \"%s\" failed",
                        cstate->filename),
                 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
}

/*
 * Release resources allocated in a cstate for COPY TO/FROM.
 */
static void
EndCopy(CopyState cstate)
{// #lizard forgives
    if (cstate->is_program)
    {
        ClosePipeToProgram(cstate);
    }
    else
    {
#ifdef __OPENTENBASE__
        if (cstate->filename != NULL && cstate->copy_file && FreeFile(cstate->copy_file))
#else
        if (cstate->filename != NULL && FreeFile(cstate->copy_file))
#endif
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not close file \"%s\": %m",
                            cstate->filename)));
    }

    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
}

/*
 * Setup CopyState to read tuples from a table or a query for COPY TO.
 */
static CopyState
BeginCopyTo(ParseState *pstate,
            Relation rel,
            RawStmt *query,
            Oid queryRelId,
            const char *filename,
            bool is_program,
            List *attnamelist,
            List *options,
#ifdef _SHARDING_
            Bitmapset * shards
#endif            
            )
{// #lizard forgives
    CopyState    cstate;
    bool        pipe = (filename == NULL);
    MemoryContext oldcontext;

    if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
    {
        if (rel->rd_rel->relkind == RELKIND_VIEW)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from view \"%s\"",
                            RelationGetRelationName(rel)),
                     errhint("Try the COPY (SELECT ...) TO variant.")));
        else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from materialized view \"%s\"",
                            RelationGetRelationName(rel)),
                     errhint("Try the COPY (SELECT ...) TO variant.")));
        else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from foreign table \"%s\"",
                            RelationGetRelationName(rel)),
                     errhint("Try the COPY (SELECT ...) TO variant.")));
        else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from sequence \"%s\"",
                            RelationGetRelationName(rel))));
        else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from partitioned table \"%s\"",
                            RelationGetRelationName(rel)),
                     errhint("Try the COPY (SELECT ...) TO variant.")));
        else
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy from non-table relation \"%s\"",
                            RelationGetRelationName(rel))));
    }

    cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
                       options,
#ifdef _SHARDING_
                        shards
#endif
            );
    oldcontext = MemoryContextSwitchTo(cstate->copycontext);

    if (pipe)
    {
        Assert(!is_program);    /* the grammar does not allow this */
        if (whereToSendOutput != DestRemote)
            cstate->copy_file = stdout;
    }
    else
    {
        cstate->filename = pstrdup(filename);
        cstate->is_program = is_program;

        if (is_program)
        {
            cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
            if (cstate->copy_file == NULL)
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not execute command \"%s\": %m",
                                cstate->filename)));
        }
        else
        {
            mode_t        oumask; /* Pre-existing umask value */
            struct stat st;

            /*
             * Prevent write to relative path ... too easy to shoot oneself in
             * the foot by overwriting a database file ...
             */
            if (!is_absolute_path(filename))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_NAME),
                         errmsg("relative path not allowed for COPY to file")));

            oumask = umask(S_IWGRP | S_IWOTH);
            cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
            umask(oumask);
            if (cstate->copy_file == NULL)
            {
                /* copy errno because ereport subfunctions might change it */
                int            save_errno = errno;

                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not open file \"%s\" for writing: %m",
                                cstate->filename),
                         (save_errno == ENOENT || save_errno == EACCES) ?
                         errhint("COPY TO instructs the PostgreSQL server process to write a file. "
                                 "You may want a client-side facility such as psql's \\copy.") : 0));
            }

            if (fstat(fileno(cstate->copy_file), &st))
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not stat file \"%s\": %m",
                                cstate->filename)));

            if (S_ISDIR(st.st_mode))
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                         errmsg("\"%s\" is a directory", cstate->filename)));
        }
    }

    MemoryContextSwitchTo(oldcontext);

    return cstate;
}

/*
 * This intermediate routine exists mainly to localize the effects of setjmp
 * so we don't need to plaster a lot of variables with "volatile".
 */
static uint64
DoCopyTo(CopyState cstate)
{
    bool        pipe = (cstate->filename == NULL);
    bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
    uint64        processed;

    PG_TRY();
    {
        if (fe_copy)
            SendCopyBegin(cstate);

        processed = CopyTo(cstate);

        if (fe_copy)
            SendCopyEnd(cstate);
    }
    PG_CATCH();
    {
        /*
         * Make sure we turn off old-style COPY OUT mode upon error. It is
         * okay to do this in all cases, since it does nothing if the mode is
         * not on.
         */
        pq_endcopyout(true);
        PG_RE_THROW();
    }
    PG_END_TRY();

    return processed;
}

/*
 * Clean up storage and release resources for COPY TO.
 */
static void
EndCopyTo(CopyState cstate)
{
    if (cstate->queryDesc != NULL)
    {
        /* Close down the query and free resources. */
        ExecutorFinish(cstate->queryDesc);
        ExecutorEnd(cstate->queryDesc);
        FreeQueryDesc(cstate->queryDesc);
        PopActiveSnapshot();
    }

    /* Clean up storage */
    EndCopy(cstate);
}

/*
 * Copy from relation or query TO file.
 */
static uint64
CopyTo(CopyState cstate)
{// #lizard forgives
    TupleDesc    tupDesc;
    int            num_phys_attrs;
    Form_pg_attribute *attr;
    ListCell   *cur;
    uint64        processed;

    if (cstate->rel)
        tupDesc = RelationGetDescr(cstate->rel);
    else
        tupDesc = cstate->queryDesc->tupDesc;
    attr = tupDesc->attrs;
    num_phys_attrs = tupDesc->natts;
    cstate->null_print_client = cstate->null_print; /* default */

    /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
    cstate->fe_msgbuf = makeStringInfo();

    /* Get info about the columns we need to process. */
    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    foreach(cur, cstate->attnumlist)
    {
        int            attnum = lfirst_int(cur);
        Oid            out_func_oid;
        bool        isvarlena;

        if (cstate->binary)
            getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
                                    &out_func_oid,
                                    &isvarlena);
        else
            getTypeOutputInfo(attr[attnum - 1]->atttypid,
                              &out_func_oid,
                              &isvarlena);
        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
    }

    /*
     * Create a temporary memory context that we can reset once per row to
     * recover palloc'd memory.  This avoids any problems with leaks inside
     * datatype output routines, and should be faster than retail pfree's
     * anyway.  (We don't need a whole econtext as CopyFrom does.)
     */
    cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
                                               "COPY TO",
                                               ALLOCSET_DEFAULT_SIZES);

    if (cstate->binary)
    {
#ifdef PGXC
    if (IS_PGXC_COORDINATOR)
    {
#endif
        /* Generate header for a binary copy */
        int32        tmp;

        /* Signature */
        CopySendData(cstate, BinarySignature, 11);
        /* Flags field */
        tmp = 0;
        if (cstate->oids)
            tmp |= (1 << 16);
        CopySendInt32(cstate, tmp);
        /* No header extension */
        tmp = 0;
        CopySendInt32(cstate, tmp);

#ifdef PGXC
        /* Need to flush out the trailer */
        CopySendEndOfRow(cstate);
    }
#endif
    }
    else
    {
        /*
         * For non-binary copy, we need to convert null_print to file
         * encoding, because it will be sent directly with CopySendString.
         */
        if (cstate->need_transcoding)
            cstate->null_print_client = pg_server_to_any(cstate->null_print,
                                                         cstate->null_print_len,
                                                         cstate->file_encoding);

        /* if a header has been requested send the line */
        if (cstate->header_line)
        {
            bool        hdr_delim = false;

            foreach(cur, cstate->attnumlist)
            {
                int            attnum = lfirst_int(cur);
                char       *colname;

                if (hdr_delim)
                    CopySendChar(cstate, cstate->delim[0]);
                hdr_delim = true;

                colname = NameStr(attr[attnum - 1]->attname);

                CopyAttributeOutCSV(cstate, colname, false,
                                    list_length(cstate->attnumlist) == 1);
            }

            CopySendEndOfRow(cstate);
        }
    }

#ifdef PGXC
    if (IS_PGXC_COORDINATOR &&
        cstate->remoteCopyState &&
        cstate->remoteCopyState->rel_loc)
    {
        RemoteCopyData *rcstate = cstate->remoteCopyState;
        processed = DataNodeCopyOut(
                    (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator),
                    getLocatorNodeCount(rcstate->locator),
                    cstate->copy_dest == COPY_FILE ? cstate->copy_file : NULL);
    }
    else
    {
#endif

    if (cstate->rel)
    {
        Datum       *values;
        bool       *nulls;
        HeapScanDesc scandesc;
        HeapTuple    tuple;
#ifdef __OPENTENBASE__
        int         i = 0;
        int32       nchildren = 0;
        Bitmapset   *children = NULL;
        Oid            partoid;
#endif
#ifdef _MLS_
        int16       cls_attnum = InvalidAttrNumber;
        Oid         parent_oid = InvalidOid;
        bool        has_datamask;
#endif
#ifdef _SHARDING_
        ShardID        shardid;
        AttrNumber    diskey = InvalidAttrNumber;
        AttrNumber    secdiskey = InvalidAttrNumber;
        int            error_shards = 0;

        if(RelationIsSharded(cstate->rel))
        {
            diskey = RelationGetDisKey(cstate->rel);

            if(diskey > RelationGetDescr(cstate->rel)->natts)
            {
                elog(ERROR, "AttrNum[%d] of distribute key is invalid, ",diskey);
            }

            secdiskey = RelationGetSecDisKey(cstate->rel);
        }
#endif
#ifdef _MLS_
        if (cstate->rel->rd_cls_struct)
        {
            cls_attnum = cstate->rel->rd_cls_struct->attnum;
        }
#endif
#ifdef __OPENTENBASE__
        //partitioned table
        cstate->nparts = 0;
        cstate->partrels = NULL;
        
        /* open partitions */
        if(RELATION_IS_INTERVAL(cstate->rel))
        {
#ifdef _MLS_            
            parent_oid = RelationGetRelid(cstate->rel);
#endif
            /* skip patitions by cold data strategy */
            nchildren = RelationGetNParts(cstate->rel);
            children  = bms_init_set(nchildren);
#ifdef __COLD_HOT__
            PruneHotData(cstate->rel->rd_id, children);
#endif

            cstate->partrels = (Relation *)palloc(nchildren * sizeof(Relation));
            cstate->nparts = 0;
            for(i = 0; i < nchildren; i++)
            {
                if (bms_is_member(i, children))
                {
                    partoid = RelationGetPartition(cstate->rel, i, false);
                    if(!OidIsValid(partoid))
					{
						//elog(ERROR, "internal error: relation has not this partition[%d]", i);
						continue;
					}
                    cstate->partrels[cstate->nparts] = heap_open(partoid, AccessShareLock);
                    
                    /* increase partition number */
                    cstate->nparts++;
                }
            }
            bms_free(children);
        }
        else
        {
            parent_oid   = mls_get_parent_oid(cstate->rel);
        }
        
        has_datamask = datamask_check_table_has_datamask(parent_oid);
#endif

        values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
        nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));

#ifdef __OPENTENBASE__
        if(cstate->nparts > 0)
        {
            int shardid;
            i = 0;
            processed = 0;
            
            for(i = 0; i < cstate->nparts; i++)
            {
                scandesc = heap_beginscan(cstate->partrels[i], GetActiveSnapshot(), 0, NULL);
                while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
                {
                    bool isdeformed = false;
                    CHECK_FOR_INTERRUPTS();

                    /* this tuple is belong shard group?*/
                    if(cstate->shard_array > 0)
                    {
                        shardid = (int)HeapTupleGetShardId(tuple);                    
                                    
                        if(shardid >= 0 && !bms_is_member(shardid, cstate->shard_array))
                            continue;
                        
                        //recoumpute shardid
                        if(!ShardIDIsValid(shardid) && RelationIsSharded(cstate->rel))
                        {
                            Datum value;
                            bool  isdisnull;
                            Oid   typeOfDistCol;
                            Datum secValue;
                            bool  issecnull;
                            Oid   typeOfSecDistCol;
                            
                            //elog(INFO, "[shardiderror] shardid cannot be invalid in shard tables %s.", RelationGetRelationName(cstate->partrels[i]));
#ifdef _MLS_
                            if (NULL != cstate->rel->rd_att->transp_crypt)
                            {
                            	trsprt_crypt_dcrpt_all_col_vale_cp(tuple, cstate->rel->rd_att, values, nulls);
                            }
                            else
                            {
#endif                                
                                heap_deform_tuple(tuple, tupDesc, values, nulls);
#ifdef _MLS_
                            }
#endif                            
                            isdeformed = true;
                            
                            /* process sharding maping */
                            if (diskey != InvalidAttrNumber) 
                            {
                                typeOfDistCol = RelationGetDescr(cstate->rel)->attrs[diskey - 1]->atttypid;
                                value          = values[diskey - 1];
                                isdisnull      = nulls[diskey - 1];
                            }
                            else
                            {
                                typeOfDistCol = InvalidOid;
                                value          = 0;
                                isdisnull      = true;
                            }
                            
                            if (secdiskey != InvalidAttrNumber)
                            {
                                typeOfSecDistCol = RelationGetDescr(cstate->rel)->attrs[secdiskey - 1]->atttypid;
                                secValue         = values[secdiskey - 1];
                                issecnull         = nulls[secdiskey - 1];
                            }
                            else
                            {
                                typeOfSecDistCol = InvalidOid;
                                secValue          = 0;
                                issecnull      = true;
                            }
                            
        
                            shardid = EvaluateShardId(typeOfDistCol, isdisnull, value,
                                                      typeOfSecDistCol, issecnull, secValue, RelationGetRelid(cstate->rel));
                            //HeapTupleHeaderSetShardId(tuple, shardId);        

                            if(shardid >= 0 && !bms_is_member(shardid, cstate->shard_array))
                                continue;
        
                            if(shardid < 0)
                            {
                                elog(ERROR, "recompute shardid failed. shardid is %d", shardid);
                            }
                        }

                    }
                    /* Deconstruct the tuple ... faster than repeated heap_getattr */
                    if(!isdeformed)
#ifdef _MLS_                        
                    {
                        if (NULL != cstate->rel->rd_att->transp_crypt)
                        {
                        	trsprt_crypt_dcrpt_all_col_vale_cp(tuple, cstate->rel->rd_att, values, nulls);
                        }
                        else
                        {
#endif       
                            heap_deform_tuple(tuple, tupDesc, values, nulls);
#ifdef _MLS_
                        }
                    }
#endif                        

#ifdef _MLS_
                    if (has_datamask)
                    {
                        dmask_exchg_all_cols_value_copy(cstate->rel->rd_att, values, nulls, parent_oid);
                    }
                    if (InvalidAttrNumber != cls_attnum)
                    {
                        if (!mls_cls_check_row_validation_in_cp(values[cls_attnum -1 ]))
                        {
                            continue;
                        }
                    }
#endif

                    /* Format and send the data */
                    CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
                    processed++;
                }
                heap_endscan(scandesc);
                scandesc = NULL;
            }

            for(i = 0; i < cstate->nparts; i++)
            {
                heap_close(cstate->partrels[i], AccessShareLock);
            }
            
            pfree(cstate->partrels);
            cstate->partrels = NULL;
        }
        else
        {
#endif    

        scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);

        processed = 0;
        while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
        {
            CHECK_FOR_INTERRUPTS();
#ifdef _MLS_
            if (NULL != cstate->rel->rd_att->transp_crypt)
            {
            	trsprt_crypt_dcrpt_all_col_vale_cp(tuple, cstate->rel->rd_att, values, nulls);
            }
            else
            {
#endif
                /* Deconstruct the tuple ... faster than repeated heap_getattr */
                heap_deform_tuple(tuple, tupDesc, values, nulls);
#ifdef _MLS_
            }
#endif            
#ifdef _SHARDING_
            if(cstate->shard_array)
            {
                shardid = (int)HeapTupleGetShardId(tuple);

                if(ShardIDIsValid(shardid) && !bms_is_member(shardid, cstate->shard_array))
                    continue;
                
                //recoumpute shardid
                if(!ShardIDIsValid(shardid) && RelationIsSharded(cstate->rel))
                {
                    Datum value;
                    bool  isdisnull;
                    Oid   typeOfDistCol;
                    Datum secValue;
                    bool  issecnull;
                    Oid   typeOfSecDistCol;

                    error_shards++;

                    /* process sharding maping */
                    typeOfDistCol = RelationGetDescr(cstate->rel)->attrs[diskey - 1]->atttypid;
                    value           = values[diskey - 1];
                    isdisnull      = nulls[diskey - 1];

                    if (secdiskey != InvalidAttrNumber)
                    {
                        typeOfSecDistCol = RelationGetDescr(cstate->rel)->attrs[secdiskey - 1]->atttypid;
                        secValue         = values[secdiskey - 1];
                        issecnull         = nulls[secdiskey - 1];
                    }
                    else
                    {
                        typeOfSecDistCol = InvalidOid;
                        secValue          = 0;
                        issecnull      = true;
                    }

                    shardid = EvaluateShardId(typeOfDistCol, isdisnull, value,
                                              typeOfSecDistCol, issecnull, secValue, RelationGetRelid(cstate->rel));
                    if(ShardIDIsValid(shardid) && !bms_is_member(shardid, cstate->shard_array))
                        continue;
                    
                    if(!ShardIDIsValid(shardid))
                    {
                        elog(ERROR, "recompute shardid failed. shardid is %d", shardid);
                    }
                }
                
            }
#endif
#ifdef _MLS_
            if (has_datamask)
            {
            	dmask_exchg_all_cols_value_copy(cstate->rel->rd_att, values, nulls, parent_oid);
            }
            if (InvalidAttrNumber != cls_attnum)
            {
                if (!mls_cls_check_row_validation_in_cp(values[cls_attnum -1 ]))
                {
                    continue;
                }
            }
#endif
            /* Format and send the data */
            CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
            processed++;
        }

        heap_endscan(scandesc);
#ifdef __OPENTENBASE__
        }
#endif

        if(error_shards > 0)
            elog(WARNING, "there are %d tuples's shardid which shardis in invalid in relation %s(%d).", 
                    error_shards,
                    RelationGetRelationName(cstate->rel),
                    RelationGetRelid(cstate->rel));

        pfree(values);
        pfree(nulls);
    }
    else
    {
        /* run the plan --- the dest receiver will send tuples */
        ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
        processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
    }

#ifdef PGXC
    }
#endif

#ifdef PGXC
    /*
     * In PGXC, it is not necessary for a Datanode to generate
     * the trailer as Coordinator is in charge of it
     */
    if (cstate->binary && IS_PGXC_COORDINATOR)
#else
    if (cstate->binary)
#endif
    {
        /* Generate trailer for a binary copy */
        CopySendInt16(cstate, -1);
        /* Need to flush out the trailer */
        CopySendEndOfRow(cstate);
    }

    MemoryContextDelete(cstate->rowcontext);

    return processed;
}

/*
 * Emit one row during CopyTo().
 */
static void
CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
{// #lizard forgives
    bool        need_delim = false;
    FmgrInfo   *out_functions = cstate->out_functions;
    MemoryContext oldcontext;
    ListCell   *cur;
    char       *string;

    MemoryContextReset(cstate->rowcontext);
    oldcontext = MemoryContextSwitchTo(cstate->rowcontext);

    if (cstate->binary)
    {
        /* Binary per-tuple header */
        CopySendInt16(cstate, list_length(cstate->attnumlist));
        /* Send OID if wanted --- note attnumlist doesn't include it */
        if (cstate->oids)
        {
            /* Hack --- assume Oid is same size as int32 */
            CopySendInt32(cstate, sizeof(int32));
            CopySendInt32(cstate, tupleOid);
        }
    }
    else
    {
        /* Text format has no per-tuple header, but send OID if wanted */
        /* Assume digits don't need any quoting or encoding conversion */
        if (cstate->oids)
        {
            string = DatumGetCString(DirectFunctionCall1(oidout,
                                                         ObjectIdGetDatum(tupleOid)));
            CopySendString(cstate, string);
            need_delim = true;
        }
    }

    foreach(cur, cstate->attnumlist)
    {
        int            attnum = lfirst_int(cur);
        Datum        value = values[attnum - 1];
        bool        isnull = nulls[attnum - 1];

        if (!cstate->binary)
        {
            if (need_delim)
                CopySendChar(cstate, cstate->delim[0]);
            need_delim = true;
        }

        if (isnull)
        {
            if (!cstate->binary)
                CopySendString(cstate, cstate->null_print_client);
            else
                CopySendInt32(cstate, -1);
        }
        else
        {
            if (!cstate->binary)
            {
                string = OutputFunctionCall(&out_functions[attnum - 1],
                                            value);
                if (cstate->csv_mode)
                    CopyAttributeOutCSV(cstate, string,
                                        cstate->force_quote_flags[attnum - 1],
                                        list_length(cstate->attnumlist) == 1);
                else
                    CopyAttributeOutText(cstate, string);
            }
            else
            {
                bytea       *outputbytes;

                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
                                               value);
                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
                CopySendData(cstate, VARDATA(outputbytes),
                             VARSIZE(outputbytes) - VARHDRSZ);
            }
        }
    }

    CopySendEndOfRow(cstate);

    MemoryContextSwitchTo(oldcontext);
}


/*
 * error context callback for COPY FROM
 *
 * The argument for the error context must be CopyState.
 */
void
CopyFromErrorCallback(void *arg)
{// #lizard forgives
    CopyState    cstate = (CopyState) arg;

    if (cstate->binary)
    {
        /* can't usefully display the data */
        if (cstate->cur_attname)
            errcontext("COPY %s, line %d, column %s, nodetype:%d(1:cn,0:dn)",
                       cstate->cur_relname, cstate->cur_lineno,
                       cstate->cur_attname, IS_PGXC_COORDINATOR);
        else
            errcontext("COPY %s, line %d, nodetype:%d(1:cn,0:dn)",
                       cstate->cur_relname, cstate->cur_lineno, IS_PGXC_COORDINATOR);
    }
    else
    {
        if (cstate->cur_attname && cstate->cur_attval)
        {
            /* error is relevant to a particular column */
            char       *attval;

            attval = limit_printout_length(cstate->cur_attval);
            errcontext("COPY %s, line %d, column %s: \"%s\", nodetype:%d(1:cn,0:dn)",
                       cstate->cur_relname, cstate->cur_lineno,
                       cstate->cur_attname, attval, IS_PGXC_COORDINATOR);
            pfree(attval);
        }
        else if (cstate->cur_attname)
        {
            /* error is relevant to a particular column, value is NULL */
            errcontext("COPY %s, line %d, column %s: null input, nodetype:%d(1:cn,0:dn)",
                       cstate->cur_relname, cstate->cur_lineno,
                       cstate->cur_attname, IS_PGXC_COORDINATOR);
        }
        else
        {
            /*
             * Error is relevant to a particular line.
             *
             * If line_buf still contains the correct line, and it's already
             * transcoded, print it. If it's still in a foreign encoding, it's
             * quite likely that the error is precisely a failure to do
             * encoding conversion (ie, bad data). We dare not try to convert
             * it, and at present there's no way to regurgitate it without
             * conversion. So we have to punt and just report the line number.
             */
            if (cstate->line_buf_valid &&
                (cstate->line_buf_converted || !cstate->need_transcoding))
            {
                char       *lineval;

                lineval = limit_printout_length(cstate->line_buf.data);
                errcontext("COPY %s, line %d: \"%s\", nodetype:%d(1:cn,0:dn)",
                           cstate->cur_relname, cstate->cur_lineno, lineval, IS_PGXC_COORDINATOR);
                pfree(lineval);
            }
            else
            {
                errcontext("COPY %s, line %d, nodetype:%d(1:cn,0:dn)",
                           cstate->cur_relname, cstate->cur_lineno, IS_PGXC_COORDINATOR);
            }
        }
    }
}

/*
 * Make sure we don't print an unreasonable amount of COPY data in a message.
 *
 * It would seem a lot easier to just use the sprintf "precision" limit to
 * truncate the string.  However, some versions of glibc have a bug/misfeature
 * that vsnprintf will always fail (return -1) if it is asked to truncate
 * a string that contains invalid byte sequences for the current encoding.
 * So, do our own truncation.  We return a pstrdup'd copy of the input.
 */
static char *
limit_printout_length(const char *str)
{
#define MAX_COPY_DATA_DISPLAY 100

    int            slen = strlen(str);
    int            len;
    char       *res;

    /* Fast path if definitely okay */
    if (slen <= MAX_COPY_DATA_DISPLAY)
        return pstrdup(str);

    /* Apply encoding-dependent truncation */
    len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);

    /*
     * Truncate, and add "..." to show we truncated the input.
     */
    res = (char *) palloc(len + 4);
    memcpy(res, str, len);
    strcpy(res + len, "...");

    return res;
}

/*
 * Copy FROM file to relation.
 */
uint64
CopyFrom(CopyState cstate)
{// #lizard forgives
    HeapTuple    tuple;
    TupleDesc    tupDesc;
    Datum       *values;
    bool       *nulls;
    ResultRelInfo *resultRelInfo;
    ResultRelInfo *saved_resultRelInfo = NULL;
    EState       *estate = CreateExecutorState(); /* for ExecConstraints() */
    ExprContext *econtext;
    TupleTableSlot *myslot;
    MemoryContext oldcontext = CurrentMemoryContext;

    ErrorContextCallback errcallback;
    CommandId    mycid = GetCurrentCommandId(true);
    int            hi_options = 0; /* start with default heap_insert options */
    BulkInsertState bistate;
    uint64        processed = 0;
    bool        useHeapMultiInsert;
    int            nBufferedTuples = 0;
    int            prev_leaf_part_index = -1;

#define MAX_BUFFERED_TUPLES 1000
    HeapTuple  *bufferedTuples = NULL;    /* initialize to silence warning */
    Size        bufferedTuplesSize = 0;
    int            firstBufferedLineNo = 0;
#ifdef __OPENTENBASE__
    BulkInsertState *part_bistates = NULL;
    HeapTuple    **part_bufferedTuples = NULL;
    int         *part_nBufferedTuples = NULL;
    Size        *part_bufferedTuplesSize = NULL;

    AttrNumber partkey = InvalidAttrNumber;
    Datum        partvalue;
    bool        isnull = false;
    int         partidx = -1;
    ResultRelInfo    *partRel = NULL;
    ResultRelInfo *tempRelInfo;
    Relation        tempparent = NULL;
    int         npart             = 0;
    bool        need_to_reset     = false;
    bool        nomore            = false;
#endif

    Assert(cstate->rel);

    /*
     * The target must be a plain relation or have an INSTEAD OF INSERT row
     * trigger.  (Currently, such triggers are only allowed on views, so we
     * only hint about them in the view case.)
     */
    if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
        cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
        !(cstate->rel->trigdesc &&
          cstate->rel->trigdesc->trig_insert_instead_row))
    {
        if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy to view \"%s\"",
                            RelationGetRelationName(cstate->rel)),
                     errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
        else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy to materialized view \"%s\"",
                            RelationGetRelationName(cstate->rel))));
        else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy to foreign table \"%s\"",
                            RelationGetRelationName(cstate->rel))));
        else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy to sequence \"%s\"",
                            RelationGetRelationName(cstate->rel))));
        else
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("cannot copy to non-table relation \"%s\"",
                            RelationGetRelationName(cstate->rel))));
    }

#if 0
    if ((cstate->rel->rd_rel->relkind == RELKIND_RELATION ||
        cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) &&
        (cstate->rel->trigdesc && (cstate->rel->trigdesc->trig_insert_instead_row ||
        cstate->rel->trigdesc->trig_insert_before_row || cstate->rel->trigdesc->trig_insert_after_row ||
        cstate->rel->trigdesc->trig_insert_before_statement || cstate->rel->trigdesc->trig_insert_after_statement ||
        cstate->rel->trigdesc->trig_insert_new_table)))
    {
        elog(ERROR, "cannot copy to table \"%s\" with trigger", RelationGetRelationName(cstate->rel));
    }
#endif

    tupDesc = RelationGetDescr(cstate->rel);

    /*----------
     * Check to see if we can avoid writing WAL
     *
     * If archive logging/streaming is not enabled *and* either
     *    - table was created in same transaction as this COPY
     *    - data is being written to relfilenode created in this transaction
     * then we can skip writing WAL.  It's safe because if the transaction
     * doesn't commit, we'll discard the table (or the new relfilenode file).
     * If it does commit, we'll have done the heap_sync at the bottom of this
     * routine first.
     *
     * As mentioned in comments in utils/rel.h, the in-same-transaction test
     * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
     * can be cleared before the end of the transaction. The exact case is
     * when a relation sets a new relfilenode twice in same transaction, yet
     * the second one fails in an aborted subtransaction, e.g.
     *
     * BEGIN;
     * TRUNCATE t;
     * SAVEPOINT save;
     * TRUNCATE t;
     * ROLLBACK TO save;
     * COPY ...
     *
     * Also, if the target file is new-in-transaction, we assume that checking
     * FSM for free space is a waste of time, even if we must use WAL because
     * of archiving.  This could possibly be wrong, but it's unlikely.
     *
     * The comments for heap_insert and RelationGetBufferForTuple specify that
     * skipping WAL logging is only safe if we ensure that our tuples do not
     * go into pages containing tuples from any other transactions --- but this
     * must be the case if we have a new table or new relfilenode, so we need
     * no additional work to enforce that.
	 *
	 * We currently don't support this optimization if the COPY target is a
	 * partitioned table as we currently only lazily initialize partition
	 * information when routing the first tuple to the partition.  We cannot
	 * know at this stage if we can perform this optimization.  It should be
	 * possible to improve on this, but it does mean maintaining heap insert
	 * option flags per partition and setting them when we first open the
	 * partition.
     *----------
     */
    /* createSubid is creation check, newRelfilenodeSubid is truncation check */
	if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
		 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
    {
        hi_options |= HEAP_INSERT_SKIP_FSM;
        if (!XLogIsNeeded())
            hi_options |= HEAP_INSERT_SKIP_WAL;
    }

    /*
     * Optimize if new relfilenode was created in this subxact or one of its
     * committed children and we won't see those rows later as part of an
     * earlier scan or command. This ensures that if this subtransaction
     * aborts then the frozen rows won't be visible after xact cleanup. Note
     * that the stronger test of exactly which subtransaction created it is
     * crucial for correctness of this optimization.
     */
    if (cstate->freeze)
    {
		/*
		* We currently disallow COPY FREEZE on partitioned tables.  The
		* reason for this is that we've simply not yet opened the partitions
		* to determine if the optimization can be applied to them.  We could
		* go and open them all here, but doing so may be quite a costly
		* overhead for small copies.  In any case, we may just end up routing
		* tuples to a small number of partitions.  It seems better just to
		* raise an ERROR for partitioned tables.
		*/
		if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
		{
		   ereport(ERROR,
		       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
		           errmsg("cannot perform FREEZE on a partitioned table")));
		}

        if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                     errmsg("cannot perform FREEZE because of prior transaction activity")));

        if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
            cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));

        hi_options |= HEAP_INSERT_FROZEN;
    }

    /*
     * We need a ResultRelInfo so we can use the regular executor's
     * index-entry-making machinery.  (There used to be a huge amount of code
     * here that basically duplicated execUtils.c ...)
     */
    resultRelInfo = makeNode(ResultRelInfo);
#ifdef __OPENTENBASE__
    if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel))
    {
        resultRelInfo->ispartparent = true;
        resultRelInfo->partpruning = RelationGetPartitionsByQuals(cstate->rel,NULL);
    }
    else
    {
        resultRelInfo->ispartparent = false;
        resultRelInfo->partpruning = NULL;
    }
#endif

    InitResultRelInfo(resultRelInfo,
                      cstate->rel,
                      1,        /* dummy rangetable index */
                      NULL,
                      0);

    ExecOpenIndices(resultRelInfo, false);

    estate->es_result_relations = resultRelInfo;
    estate->es_num_result_relations = 1;
    estate->es_result_relation_info = resultRelInfo;
    estate->es_range_table = cstate->range_table;

    /* Set up a tuple slot too */
    myslot = ExecInitExtraTupleSlot(estate);
    ExecSetSlotDescriptor(myslot, tupDesc);
    /* Triggers might need a slot as well */
    estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);

    /*
     * It's more efficient to prepare a bunch of tuples for insertion, and
     * insert them in one heap_multi_insert() call, than call heap_insert()
     * separately for every tuple. However, we can't do that if there are
     * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
     * expressions. Such triggers or expressions might query the table we're
     * inserting to, and act differently if the tuples that have already been
     * processed and prepared for insertion are not there.  We also can't do
     * it if the table is partitioned.
     */
    if ((resultRelInfo->ri_TrigDesc != NULL &&
         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
          resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
		cstate->partition_tuple_routing != NULL ||
        cstate->volatile_defexprs)
    {
        useHeapMultiInsert = false;
    }
    else
    {
        useHeapMultiInsert = true;
#ifdef __OPENTENBASE__
        if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel))
        {
            int i;
            part_bufferedTuples = (HeapTuple **)palloc0(RelationGetNParts(cstate->rel)*sizeof(HeapTuple*));
            for(i = 0; i < RelationGetNParts(cstate->rel); i++)
            {
                part_bufferedTuples[i] = (HeapTuple *)palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
            }

            part_bufferedTuplesSize = (Size *)palloc0(RelationGetNParts(cstate->rel) * sizeof(Size));
            part_nBufferedTuples = (int *)palloc0(RelationGetNParts(cstate->rel) * sizeof(int));

            npart = RelationGetNParts(cstate->rel);
        }
#endif
        bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
    }

    /* Prepare to catch AFTER triggers. */
    AfterTriggerBeginQuery();

    /*
     * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
     * should do this for COPY, since it's not really an "INSERT" statement as
     * such. However, executing these triggers maintains consistency with the
     * EACH ROW triggers that we already fire on COPY.
     */
    ExecBSInsertTriggers(estate, resultRelInfo);

    values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
    nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));

#ifdef __OPENTENBASE__
    if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel))
    {
        int i = 0;
        part_bistates = (BulkInsertState *)palloc(RelationGetNParts(cstate->rel) * sizeof(BulkInsertState));
        for(i = 0; i < RelationGetNParts(cstate->rel); i++)
        {
            part_bistates[i] = GetBulkInsertState_part(npart);
        }
        /* for default partition */
        bistate = GetBulkInsertState_part(npart);
    }
    else
    {
#endif

    bistate = GetBulkInsertState();
#ifdef __OPENTENBASE__
    }
#endif
    econtext = GetPerTupleExprContext(estate);

    /* Set up callback to identify error line number */
    errcallback.callback = CopyFromErrorCallback;
    errcallback.arg = (void *) cstate;
    errcallback.previous = error_context_stack;
    error_context_stack = &errcallback;

    for (;;)
    {
        TupleTableSlot *slot;
        bool        skip_tuple;
        Oid            loaded_oid = InvalidOid;

        CHECK_FOR_INTERRUPTS();
#ifdef __OPENTENBASE__
        if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel) && useHeapMultiInsert)
        {
            if(need_to_reset)
            {
                ResetPerTupleExprContext(estate);
                need_to_reset = false;
            }
        }
        else
        {
#endif

        if (nBufferedTuples == 0)
        {
            /*
             * Reset the per-tuple exprcontext. We can only do this if the
             * tuple buffer is empty. (Calling the context the per-tuple
             * memory context is a bit of a misnomer now.)
             */
            ResetPerTupleExprContext(estate);
        }
#ifdef __OPENTENBASE__
        }
#endif

        /* Switch into its memory context */
        MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
   
#ifdef __OPENTENBASE__
        /* skip error line when copy from */

        if (g_enable_copy_silence)
        {
readnextline:            
            PG_TRY();
            {
                if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
                {
                    nomore = true;
                }
            }
            PG_CATCH();
            {    
#ifdef __OPENTENBASE__
                /* 
                 * errors occured in copy from transformed from insert into multi-values,
                 * should throw the errors
                 */
                if (cstate->insert_into)
                {
                    PG_RE_THROW();
                }
                else
                {
#endif
                HOLD_INTERRUPTS();
                EmitErrorReport();
                FlushErrorState();
                RESUME_INTERRUPTS();
                /*
                 * omit errors from input files, PG_exception_stack has been reset.
                 */
                goto readnextline;
#ifdef __OPENTENBASE__
                }
#endif
            }
            PG_END_TRY();    

            /*
             * after PG_END_TRY(), break action would be allowed.
             */
            if (nomore)
            {
                break;
            }            
        }
        else
        {
#endif                    
            if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
                break;
#ifdef __OPENTENBASE__
        }
#endif


#ifdef PGXC
        /*
         * Send the data row as-is to the Datanodes. If default values
         * are to be inserted, append them onto the data row.
         */
        if (IS_PGXC_COORDINATOR && cstate->remoteCopyState->rel_loc)
        {
            Datum                 value = (Datum) 0;
            bool                isnull = true;
            RemoteCopyData        *rcstate = cstate->remoteCopyState;
            AttrNumber            dist_col = rcstate->rel_loc->partAttrNum;
#ifdef __COLD_HOT__
            Datum                 secValue = (Datum) 0;
            bool                secisnull = true;
#endif

            if (AttributeNumberIsValid(dist_col))
            {
                value = values[dist_col-1];
                isnull = nulls[dist_col-1];
            }

#ifdef __COLD_HOT__
            if (AttributeNumberIsValid(rcstate->rel_loc->secAttrNum))
            {
                secValue = values[rcstate->rel_loc->secAttrNum - 1];
                secisnull = nulls[rcstate->rel_loc->secAttrNum - 1];
            }
#endif

            if (DataNodeCopyIn(cstate->line_buf.data,
                               cstate->line_buf.len,
#ifdef __COLD_HOT__
                               GET_NODES(rcstate->locator, value, isnull, secValue, secisnull, NULL),
#else
                               GET_NODES(rcstate->locator, value, isnull, NULL),
#endif
                               (PGXCNodeHandle**) getLocatorResults(rcstate->locator),
#ifdef __OPENTENBASE__
                               (cstate->binary || cstate->insert_into)))
#else
                                  cstate->binary))
#endif
            {
                int conn_count;
                int loop;
                PGXCNodeHandle** copy_connections;
                StringInfoData   sqldata;
                StringInfo       sql; 

                sql = &sqldata;

                initStringInfo(sql);
#ifdef __COLD_HOT__
                conn_count = GET_NODES(rcstate->locator, value, isnull, secValue, secisnull, NULL);
#else
                conn_count = GET_NODES(rcstate->locator, value, isnull, NULL);
#endif
                copy_connections = (PGXCNodeHandle**)getLocatorResults(rcstate->locator);
                for(loop = 0; loop < conn_count; loop++)
                {
                    PGXCNodeHandle *handle = copy_connections[loop];
                    if ('\0' != handle->error[0])
                    {
                        appendStringInfo(sql, "%s;", handle->error);
                    }
                }

                ereport(ERROR,
                            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                             errmsg("Copy failed on a data node:%s", sql->data)));
            }
            processed++;
        }
        else
        {
#endif
        /* And now we can form the input tuple. */
#ifdef _SHARDING_
        if(RelationIsSharded(cstate->rel))
        {            
            tuple = heap_form_tuple_plain(tupDesc,values,nulls,RelationGetDisKey(cstate->rel),
                                          RelationGetSecDisKey(cstate->rel), RelationGetRelid(cstate->rel));

            if(IsShardBarriered(cstate->rel->rd_node, HeapTupleGetShardId(tuple)))
            {
                elog(ERROR, "shard is be vacuuming now, so it is forbidden to write.");
            }
        }
        else
        {
            tuple = heap_form_tuple(tupDesc, values, nulls);
        }
#endif
        if (loaded_oid != InvalidOid)
            HeapTupleSetOid(tuple, loaded_oid);

        /*
         * Constraints might reference the tableoid column, so initialize
         * t_tableOid before evaluating them.
         */
        tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);

        /* Triggers and stuff need to be invoked in query context. */
        MemoryContextSwitchTo(oldcontext);

        /* Place tuple in tuple slot --- but slot shouldn't free it */
        slot = myslot;
        ExecStoreTuple(tuple, slot, InvalidBuffer, false);

        /* Determine the partition to heap_insert the tuple into */
		if (cstate->partition_tuple_routing)
        {
            int            leaf_part_index;
			PartitionTupleRouting *proute = cstate->partition_tuple_routing;

            /*
             * Away we go ... If we end up not finding a partition after all,
             * ExecFindPartition() does not return and errors out instead.
             * Otherwise, the returned value is to be used as an index into
             * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
             * will get us the ResultRelInfo and TupleConversionMap for the
             * partition, respectively.
             */
            leaf_part_index = ExecFindPartition(resultRelInfo,
												proute->partition_dispatch_info,
                                                slot,
                                                estate);
            Assert(leaf_part_index >= 0 &&
				   leaf_part_index < proute->num_partitions);

            /*
             * If this tuple is mapped to a partition that is not same as the
             * previous one, we'd better make the bulk insert mechanism gets a
             * new buffer.
             */
            if (prev_leaf_part_index != leaf_part_index)
            {
                ReleaseBulkInsertStatePin(bistate);
                prev_leaf_part_index = leaf_part_index;
            }

            /*
             * Save the old ResultRelInfo and switch to the one corresponding
             * to the selected partition.
             */
            saved_resultRelInfo = resultRelInfo;
			resultRelInfo = proute->partitions[leaf_part_index];
			if (resultRelInfo == NULL)
			{
				resultRelInfo = ExecInitPartitionInfo(NULL,
													  saved_resultRelInfo,
													  proute, estate,
													  leaf_part_index);
				Assert(resultRelInfo != NULL);
			}

            /* We do not yet have a way to insert into a foreign partition */
            if (resultRelInfo->ri_FdwRoutine)
                ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("cannot route inserted tuples to a foreign table")));

            /*
             * For ExecInsertIndexTuples() to work on the partition's indexes
             */
            estate->es_result_relation_info = resultRelInfo;

            /*
             * If we're capturing transition tuples, we might need to convert
             * from the partition rowtype to parent rowtype.
             */
            if (cstate->transition_capture != NULL)
            {
                if (resultRelInfo->ri_TrigDesc &&
					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
                {
                    /*
					 * If there are any BEFORE triggers on the partition,
					 * we'll have to be ready to convert their result back to
					 * tuplestore format.
                     */
                    cstate->transition_capture->tcs_original_insert_tuple = NULL;
                    cstate->transition_capture->tcs_map =
						TupConvMapForLeaf(proute, saved_resultRelInfo,
										  leaf_part_index);
                }
                else
                {
                    /*
                     * Otherwise, just remember the original unconverted
                     * tuple, to avoid a needless round trip conversion.
                     */
                    cstate->transition_capture->tcs_original_insert_tuple = tuple;
                    cstate->transition_capture->tcs_map = NULL;
                }
            }

            /*
             * We might need to convert from the parent rowtype to the
             * partition rowtype.
             */
            tuple = ConvertPartitionTupleSlot(resultRelInfo->ri_RelationDesc, proute->parent_child_tupconv_maps[leaf_part_index],
                                              tuple,
                                              proute->partition_tuple_slot,
                                              &slot);

            tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
        }

        skip_tuple = false;

        /* BEFORE ROW INSERT Triggers */
        if (resultRelInfo->ri_TrigDesc &&
            resultRelInfo->ri_TrigDesc->trig_insert_before_row)
        {
            slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);

            if (slot == NULL)    /* "do nothing" */
                skip_tuple = true;
            else                /* trigger might have changed tuple */
                tuple = ExecMaterializeSlot(slot);
        }

        if (!skip_tuple)
        {
            if (resultRelInfo->ri_TrigDesc &&
                resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
            {
                /* Pass the data to the INSTEAD ROW INSERT trigger */
                ExecIRInsertTriggers(estate, resultRelInfo, slot);
            }
            else
            {
				/* Check the constraints of the tuple */
				if (resultRelInfo->ri_RelationDesc->rd_att->constr)
				   ExecConstraints(resultRelInfo, slot, estate);

                /*
				* Also check the tuple against the partition constraint, if
				* there is one; except that if we got here via tuple-routing,
				* we don't need to if there's no BR trigger defined on the
				* partition.
                 */
				if (resultRelInfo->ri_PartitionCheck &&
				   (saved_resultRelInfo == NULL ||
				    (resultRelInfo->ri_TrigDesc &&
				     resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
				   ExecPartitionCheck(resultRelInfo, slot, estate, true);

#ifdef _MLS_
                if (is_mls_user())
                    CheckMlsTableUserAcl(resultRelInfo,slot->tts_tuple);
#endif

#ifdef __OPENTENBASE__
                if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel))
                {
                    /* router for tuple */
                    partkey = RelationGetPartitionColumnIndex(cstate->rel);
                    partvalue = slot_getattr(slot, partkey, &isnull);

                    if(isnull)
                    {
                        partidx = -1;
                    }
                    else
                    {
                        partidx = RelationGetPartitionIdxByValue(cstate->rel,partvalue);
                    }
                }                        
#endif
                if (useHeapMultiInsert)
                {
                    /* Add this tuple to the tuple buffer */
                    if (nBufferedTuples == 0)
                        firstBufferedLineNo = cstate->cur_lineno;

#ifdef __OPENTENBASE__
                    if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel) && partidx >= 0)
                    {
                        part_bufferedTuples[partidx][part_nBufferedTuples[partidx]++] = tuple;
                        part_bufferedTuplesSize[partidx] += tuple->t_len;
                    }
                    else
                    {
#endif
                        bufferedTuples[nBufferedTuples++] = tuple;
                        bufferedTuplesSize += tuple->t_len;
#ifdef __OPENTENBASE__
                    }
#endif

                    /*
                     * If the buffer filled up, flush it.  Also flush if the
                     * total size of all the tuples in the buffer becomes
                     * large, to avoid using large amounts of memory for the
                     * buffer when the tuples are exceptionally wide.
                     */
#ifdef __OPENTENBASE__
                    if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel))
                    {
                        if(part_nBufferedTuples[partidx] == MAX_BUFFERED_TUPLES ||
                            part_bufferedTuplesSize[partidx] > 65535 ||
                            nBufferedTuples == MAX_BUFFERED_TUPLES ||
                            bufferedTuplesSize > 65535)
                        {
                            int i=0;
							char *partname = NULL;
							Oid partoid = InvalidOid;
							int part_rel_index = 0;

                            for(i=0; i < npart; i++)
                            {
								if(part_nBufferedTuples[i] > 0)
								{
                                    /* the child table have data to be inserted */
									partname = GetPartitionName(RelationGetRelid(cstate->rel), i, false);
									partoid = get_relname_relid(partname, RelationGetNamespace(cstate->rel));
									if(partoid)
									{
										/* Find the corresponding partition child table info that has not been dropped */
										for(part_rel_index = 0; part_rel_index < resultRelInfo->partarraysize; part_rel_index++)
										{
											if(i == resultRelInfo->part_relinfo[part_rel_index]->part_index)
											{
												/* insert data into corresponding partition child table */
												partRel = resultRelInfo->part_relinfo[part_rel_index];
												tempparent = cstate->rel;
												tempRelInfo =  estate->es_result_relation_info;
												estate->es_result_relation_info = partRel;
												cstate->rel = partRel->ri_RelationDesc;


				                                CopyFromInsertBatch(cstate, estate, mycid, hi_options,
				        													partRel, myslot, part_bistates[i],
				        													part_nBufferedTuples[i], part_bufferedTuples[i],
				        													firstBufferedLineNo);

												estate->es_result_relation_info = tempRelInfo;
												cstate->rel = tempparent;

												part_nBufferedTuples[i] = 0;
												part_bufferedTuplesSize[i] = 0;
												break;
											}
										}
									}
									else
									{
										/* The data of the dropped child table is classified into the default partition */
		                                CopyFromInsertBatch(cstate, estate, mycid, hi_options,
														resultRelInfo, myslot, bistate,
														part_nBufferedTuples[i], part_bufferedTuples[i],
														firstBufferedLineNo);

		                                part_nBufferedTuples[i] = 0;
		                                part_bufferedTuplesSize[i] = 0;
									}
								}
                            }

                            if(nBufferedTuples > 0)
                            {
								/* The data of > npart table is classified into the default partition */
                                CopyFromInsertBatch(cstate, estate, mycid, hi_options,
                                                resultRelInfo, myslot, bistate,
                                                nBufferedTuples, bufferedTuples,
                                                firstBufferedLineNo);

                                nBufferedTuples = 0;
                                bufferedTuplesSize = 0;
                            }

                            need_to_reset = true;
                        }
                    }
                    else
                    {
#endif

                    if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
                        bufferedTuplesSize > 65535)
                    {

                        CopyFromInsertBatch(cstate, estate, mycid, hi_options,
                                        resultRelInfo, myslot, bistate,
                                        nBufferedTuples, bufferedTuples,
                                        firstBufferedLineNo);

                        nBufferedTuples = 0;
                        bufferedTuplesSize = 0;
                    }
#ifdef __OPENTENBASE__
                    }
#endif

                }
                else
                {
                    List       *recheckIndexes = NIL;

#ifdef __OPENTENBASE__
                    if(IS_PGXC_DATANODE && RELATION_IS_INTERVAL(cstate->rel) && partidx >= 0)
                    {
                        partRel = resultRelInfo->part_relinfo[partidx];         

                        heap_insert(partRel->ri_RelationDesc, tuple,
                                                mycid, hi_options, part_bistates[partidx]);
                        
                        if (partRel->ri_NumIndices > 0)
                        {
                            tempRelInfo =  estate->es_result_relation_info;
                            estate->es_result_relation_info = partRel;
                            recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
                                                                   estate, false,
                                                                   NULL,
                                                                   NULL);
                            estate->es_result_relation_info = tempRelInfo;
                        }
                    }
                    else
                    {
#endif

                    /* OK, store the tuple and create index entries for it */
                    heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
                                hi_options, bistate);

                    if (resultRelInfo->ri_NumIndices > 0)
                        recheckIndexes = ExecInsertIndexTuples(slot,
                                                               &(tuple->t_self),
                                                               estate,
                                                               false,
                                                               NULL,
                                                               NIL);
#ifdef __OPENTENBASE__
                    }
#endif
                    /* AFTER ROW INSERT Triggers */
                    ExecARInsertTriggers(estate, resultRelInfo, tuple,
                                         recheckIndexes, cstate->transition_capture);

                    list_free(recheckIndexes);
                }
            }

            /*
             * We count only tuples not suppressed by a BEFORE INSERT trigger;
             * this is the same definition used by execMain.c for counting
             * tuples inserted by an INSERT command.
             */
            processed++;
		}

        /* Restore the saved ResultRelInfo */
            if (saved_resultRelInfo)
            {
                resultRelInfo = saved_resultRelInfo;
                estate->es_result_relation_info = resultRelInfo;
            }
#ifdef PGXC
        }
#endif
    }
    /* Flush any remaining buffered tuples */
#ifdef __OPENTENBASE__
    if(IS_PGXC_DATANODE && npart > 0)
    {
        int partcur = 0;
		char *partname = NULL;
		Oid partoid = InvalidOid;
		int part_rel_index = 0;

        for(partcur = 0; partcur < npart; partcur++)
        {
            if(part_nBufferedTuples[partcur] > 0)
            {
				/* the child table have data to be inserted */
				partname = GetPartitionName(RelationGetRelid(cstate->rel), partcur, false);
				partoid = get_relname_relid(partname, RelationGetNamespace(cstate->rel));
				if(partoid)
				{
					/* Find the corresponding partition child table info that has not been dropped */
					for(part_rel_index = 0; part_rel_index < resultRelInfo->partarraysize; part_rel_index++)
					{
						if(partcur == resultRelInfo->part_relinfo[part_rel_index]->part_index)
						{
							/* insert data into corresponding partition child table */
							partRel = resultRelInfo->part_relinfo[part_rel_index];
							tempparent = cstate->rel;
							tempRelInfo =  estate->es_result_relation_info;
							estate->es_result_relation_info = partRel;
							cstate->rel = partRel->ri_RelationDesc;


			                CopyFromInsertBatch(cstate, estate, mycid, hi_options,
										partRel, myslot, part_bistates[partcur],
										part_nBufferedTuples[partcur], part_bufferedTuples[partcur],
										firstBufferedLineNo);


							estate->es_result_relation_info = tempRelInfo;
							cstate->rel = tempparent;
							part_nBufferedTuples[partcur] = 0;
							part_bufferedTuplesSize[partcur] = 0;
							break;
						}
					}
				}
				else
				{
					/* The data of the dropped child table is classified into the default partition */
			        CopyFromInsertBatch(cstate, estate, mycid, hi_options,
										resultRelInfo, myslot, bistate,
										part_nBufferedTuples[partcur], part_bufferedTuples[partcur],
										firstBufferedLineNo);

			        part_nBufferedTuples[partcur] = 0;
			        part_bufferedTuplesSize[partcur] = 0;
				}
            }
        }
    }
#endif

    if (nBufferedTuples > 0)
    {
		/* The data of > npart table is classified into the default partition */
        CopyFromInsertBatch(cstate, estate, mycid, hi_options,
                            resultRelInfo, myslot, bistate,
                            nBufferedTuples, bufferedTuples,
                            firstBufferedLineNo);

        nBufferedTuples = 0;
        bufferedTuplesSize = 0;
    }

#ifdef XCP
    /*
     * Now if line buffer contains some data that is an EOF marker. We should
     * send it to all the participating datanodes
     */
    if (cstate->line_buf.len > 0)
    {
        RemoteCopyData        *rcstate = cstate->remoteCopyState;
        if (DataNodeCopyIn(cstate->line_buf.data,
                           cstate->line_buf.len,
                           getLocatorNodeCount(rcstate->locator),
                           (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator),
                           cstate->binary))
        {
            int conn_count;
            int loop;
            PGXCNodeHandle** copy_connections;
            StringInfoData   sqldata;
            StringInfo       sql; 

            sql = &sqldata;
            initStringInfo(sql);

            conn_count       = getLocatorNodeCount(rcstate->locator);
            copy_connections = (PGXCNodeHandle**) getLocatorNodeMap(rcstate->locator);
            
            for(loop = 0; loop < conn_count; loop++)
            {
                PGXCNodeHandle *handle = copy_connections[loop];
                if ('\0' != handle->error[0])
                {
                    appendStringInfo(sql, "%s;", handle->error);
                }
            }

            ereport(ERROR,
                        (errcode(ERRCODE_CONNECTION_EXCEPTION),
                         errmsg("Copy failed on a data node:%s", sql->data)));
        }

    }
#endif

    /* Done, clean up */
    error_context_stack = errcallback.previous;

#ifdef __OPENTENBASE__
    if(IS_PGXC_DATANODE && RelationGetNParts(cstate->rel)> 0)
    {
        int i = 0;
        for(i = 0; i < RelationGetNParts(cstate->rel); i++)
            FreeBulkInsertState(part_bistates[i]);
    }
#endif

    FreeBulkInsertState(bistate);
#ifdef __OPENTENBASE__
    if(IS_PGXC_DATANODE && npart > 0)
    {
        if(part_nBufferedTuples)
            pfree(part_nBufferedTuples);
        if(part_bufferedTuplesSize)
            pfree(part_bufferedTuplesSize);

        if(part_bufferedTuples)
        {
            int ii;
            for(ii=0; ii < npart; ii++)
            {
                if(part_bufferedTuples[ii])
                    pfree(part_bufferedTuples[ii]);
            }
            pfree(part_bufferedTuples);
        }
    }
#endif

    MemoryContextSwitchTo(oldcontext);

    /*
     * In the old protocol, tell pqcomm that we can process normal protocol
     * messages again.
     */
    if (cstate->copy_dest == COPY_OLD_FE)
        pq_endmsgread();

    /* Execute AFTER STATEMENT insertion triggers */
    ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);

    /* Handle queued AFTER triggers */
    AfterTriggerEndQuery(estate);

    pfree(values);
    pfree(nulls);

    ExecResetTupleTable(estate->es_tupleTable, false);

#ifdef __OPENTENBASE__
    if(resultRelInfo->ispartparent)
    {
        int j=0;
        ResultRelInfo *partrel = NULL;
        for(j = 0; j < resultRelInfo->partarraysize; j++)
        {
            partrel = resultRelInfo->part_relinfo[j];
            if(partrel)
            {
                ExecCloseIndices(partrel);
                heap_close(partrel->ri_RelationDesc, RowExclusiveLock);
            }
        }
    }
#endif

    ExecCloseIndices(resultRelInfo);

    /* Close all the partitioned tables, leaf partitions, and their indices */
	if (cstate->partition_tuple_routing)
		ExecCleanupTupleRouting(cstate->partition_tuple_routing);

    /* Close any trigger target relations */
    ExecCleanUpTriggerState(estate);

    FreeExecutorState(estate);

    /*
     * If we skipped writing WAL, then we need to sync the heap (but not
     * indexes since those use WAL anyway)
     */
    if (hi_options & HEAP_INSERT_SKIP_WAL)
        heap_sync(cstate->rel);

    return processed;
}

/*
 * A subroutine of CopyFrom, to write the current batch of buffered heap
 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
 * triggers.
 */
static void
CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
                    int hi_options, ResultRelInfo *resultRelInfo,
                    TupleTableSlot *myslot, BulkInsertState bistate,
                    int nBufferedTuples, HeapTuple *bufferedTuples,
                    int firstBufferedLineNo)
{
    MemoryContext oldcontext;
    int            i;
    int            save_cur_lineno;

    /*
     * Print error context information correctly, if one of the operations
     * below fail.
     */
    cstate->line_buf_valid = false;
    save_cur_lineno = cstate->cur_lineno;

    /*
     * heap_multi_insert leaks memory, so switch to short-lived memory context
     * before calling it.
     */
    oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    heap_multi_insert(cstate->rel,
                      bufferedTuples,
                      nBufferedTuples,
                      mycid,
                      hi_options,
                      bistate);
    MemoryContextSwitchTo(oldcontext);

    /*
     * If there are any indexes, update them for all the inserted tuples, and
     * run AFTER ROW INSERT triggers.
     */
    if (resultRelInfo->ri_NumIndices > 0)
    {
        for (i = 0; i < nBufferedTuples; i++)
        {
            List       *recheckIndexes;

            cstate->cur_lineno = firstBufferedLineNo + i;
            ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
            recheckIndexes =
                ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
                                      estate, false, NULL, NIL);
            ExecARInsertTriggers(estate, resultRelInfo,
                                 bufferedTuples[i],
                                 recheckIndexes, cstate->transition_capture);
            list_free(recheckIndexes);
        }
    }

    /*
     * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
     * anyway.
     */
    else if (resultRelInfo->ri_TrigDesc != NULL &&
             (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
              resultRelInfo->ri_TrigDesc->trig_insert_new_table))
    {
        for (i = 0; i < nBufferedTuples; i++)
        {
            cstate->cur_lineno = firstBufferedLineNo + i;
            ExecARInsertTriggers(estate, resultRelInfo,
                                 bufferedTuples[i],
                                 NIL, cstate->transition_capture);
        }
    }

    /* reset cur_lineno to where we were */
    cstate->cur_lineno = save_cur_lineno;
}

/*
 * Setup to read tuples from a file for COPY FROM.
 *
 * 'rel': Used as a template for the tuples
 * 'filename': Name of server-local file to read
 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
 *
 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
 */
CopyState
BeginCopyFrom(ParseState *pstate,
              Relation rel,
              const char *filename,
              bool is_program,
              copy_data_source_cb data_source_cb,
              List *attnamelist,
              List *options)
{// #lizard forgives
    CopyState    cstate;
    bool        pipe = (filename == NULL);
    TupleDesc    tupDesc;
    Form_pg_attribute *attr;
    AttrNumber    num_phys_attrs,
                num_defaults;
    FmgrInfo   *in_functions;
    Oid           *typioparams;
    int            attnum;
    Oid            in_func_oid;
    int           *defmap;
    ExprState **defexprs;
    MemoryContext oldcontext;
    bool        volatile_defexprs;

    cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options, NULL);
    oldcontext = MemoryContextSwitchTo(cstate->copycontext);

    /* Initialize state variables */
    cstate->fe_eof = false;
    cstate->eol_type = EOL_UNKNOWN;
    cstate->cur_relname = RelationGetRelationName(cstate->rel);
    cstate->cur_lineno = 0;
    cstate->cur_attname = NULL;
    cstate->cur_attval = NULL;
#ifdef __OPENTENBASE__
    cstate->data_list = NULL;
    cstate->data_index = 0;
    cstate->data_ncolumns = 0;
    cstate->ndatarows = 0;

    if (pstate && pstate->stmt && pstate->stmt->insert_into)
    {
        cstate->data_list = pstate->stmt->data_list;
        cstate->data_ncolumns = pstate->stmt->ncolumns;
        cstate->ndatarows = pstate->stmt->ndatarows;
    }
#endif

    /* Set up variables to avoid per-attribute overhead. */
    initStringInfo(&cstate->attribute_buf);
    initStringInfo(&cstate->line_buf);
    cstate->line_buf_converted = false;
    cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
    cstate->raw_buf_index = cstate->raw_buf_len = 0;

    /* Assign range table, we'll need it in CopyFrom. */
    if (pstate)
        cstate->range_table = pstate->p_rtable;

    tupDesc = RelationGetDescr(cstate->rel);
    attr = tupDesc->attrs;
    num_phys_attrs = tupDesc->natts;
    num_defaults = 0;
    volatile_defexprs = false;

    /*
     * Pick up the required catalog information for each attribute in the
     * relation, including the input function, the element type (to pass to
     * the input function), and info about defaults and constraints. (Which
     * input function we use depends on text/binary format choice.)
     */
    in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));

#ifdef PGXC
    /* Output functions are required to convert default values to output form */
    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
#endif

    for (attnum = 1; attnum <= num_phys_attrs; attnum++)
    {
        /* We don't need info for dropped attributes */
        if (attr[attnum - 1]->attisdropped)
            continue;

        /* Fetch the input function and typioparam info */
        if (cstate->binary)
            getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
                                   &in_func_oid, &typioparams[attnum - 1]);
        else
            getTypeInputInfo(attr[attnum - 1]->atttypid,
                             &in_func_oid, &typioparams[attnum - 1]);
        fmgr_info(in_func_oid, &in_functions[attnum - 1]);

        /* Get default info if needed */
        if (!list_member_int(cstate->attnumlist, attnum))
        {
            /* attribute is NOT to be copied from input */
            /* use default value if one exists */
            Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
                                                                attnum);

            if (defexpr != NULL)
            {
#ifdef PGXC
                if (IS_PGXC_COORDINATOR)
                {
                    /*
                     * If default expr is shippable to Datanode, don't include
                     * default values in the data row sent to the Datanode; let
                     * the Datanode insert the default values.
                     */
                    Expr *planned_defexpr = expression_planner((Expr *) defexpr);
                    defexpr = expression_planner(defexpr);

#ifdef __OPENTENBASE__
                    /* treat distributed column as defaults too */
                    if (!pgxc_is_expr_shippable(planned_defexpr, NULL) ||
                        IsDistributedColumn(attnum, rel->rd_locator_info))
#else
                    if (!pgxc_is_expr_shippable(planned_defexpr, NULL))
#endif
                    {
                        Oid    out_func_oid;
                        bool   isvarlena;
                        /* Initialize expressions in copycontext. */
                        defexprs[num_defaults] = ExecInitExpr(planned_defexpr, NULL);
                        defmap[num_defaults] = attnum - 1;
                        num_defaults++;

                        /*
                         * Initialize output functions needed to convert default
                         * values into output form before appending to data row.
                         */
                        if (cstate->binary)
                            getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
                                                    &out_func_oid, &isvarlena);
                        else
                            getTypeOutputInfo(attr[attnum - 1]->atttypid,
                                              &out_func_oid, &isvarlena);
                        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
                    }
                }
                else
                {
#endif /* PGXC */
                /* Run the expression through planner */
                defexpr = expression_planner(defexpr);

                /* Initialize executable expression in copycontext */
                defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
                defmap[num_defaults] = attnum - 1;
                num_defaults++;

                /*
                 * If a default expression looks at the table being loaded,
                 * then it could give the wrong answer when using
                 * multi-insert. Since database access can be dynamic this is
                 * hard to test for exactly, so we use the much wider test of
                 * whether the default expression is volatile. We allow for
                 * the special case of when the default expression is the
                 * nextval() of a sequence which in this specific case is
                 * known to be safe for use with the multi-insert
                 * optimization. Hence we use this special case function
                 * checker rather than the standard check for
                 * contain_volatile_functions().
                 */
                if (!volatile_defexprs)
                    volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
#ifdef PGXC
                }
#endif
            }
        }
    }

    /* We keep those variables in cstate. */
    cstate->in_functions = in_functions;
    cstate->typioparams = typioparams;
    cstate->defmap = defmap;
    cstate->defexprs = defexprs;
    cstate->volatile_defexprs = volatile_defexprs;
    cstate->num_defaults = num_defaults;
    cstate->is_program = is_program;

    if (data_source_cb)
    {
        cstate->copy_dest = COPY_CALLBACK;
        cstate->data_source_cb = data_source_cb;
    }
    else if (pipe)
    {
        Assert(!is_program);    /* the grammar does not allow this */
        if (whereToSendOutput == DestRemote)
            ReceiveCopyBegin(cstate);
        else
            cstate->copy_file = stdin;
    }
    else
    {
        cstate->filename = pstrdup(filename);

        if (cstate->is_program)
        {
            cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
            if (cstate->copy_file == NULL)
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not execute command \"%s\": %m",
                                cstate->filename)));
        }
        else
        {
#ifdef __OPENTENBASE__
            if (pstate && pstate->stmt && pstate->stmt->insert_into)
            {
                /* insert_into to copy_from */
            }
            else
            {
#endif
            struct stat st;

            cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
            if (cstate->copy_file == NULL)
            {
                /* copy errno because ereport subfunctions might change it */
                int            save_errno = errno;

                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not open file \"%s\" for reading: %m",
                                cstate->filename),
                         (save_errno == ENOENT || save_errno == EACCES) ?
                         errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
                                 "You may want a client-side facility such as psql's \\copy.") : 0));
            }

            if (fstat(fileno(cstate->copy_file), &st))
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not stat file \"%s\": %m",
                                cstate->filename)));

            if (S_ISDIR(st.st_mode))
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                         errmsg("\"%s\" is a directory", cstate->filename)));
#ifdef __OPENTENBASE__
            }
#endif
        }
    }

    if (!cstate->binary)
    {
        /* must rely on user to tell us... */
        cstate->file_has_oids = cstate->oids;
    }
    else
    {
        /* Read and verify binary header */
        char        readSig[11];
        int32        tmp;

        /* Signature */
        if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
            memcmp(readSig, BinarySignature, 11) != 0)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("COPY file signature not recognized")));
        /* Flags field */
        if (!CopyGetInt32(cstate, &tmp))
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("invalid COPY file header (missing flags)")));
        cstate->file_has_oids = (tmp & (1 << 16)) != 0;
        tmp &= ~(1 << 16);
        if ((tmp >> 16) != 0)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("unrecognized critical flags in COPY file header")));
        /* Header extension length */
        if (!CopyGetInt32(cstate, &tmp) ||
            tmp < 0)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("invalid COPY file header (missing length)")));
        /* Skip extension header, if present */
        while (tmp-- > 0)
        {
            if (CopyGetData(cstate, readSig, 1, 1) != 1)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("invalid COPY file header (wrong length)")));
        }
#ifdef PGXC
        /* This is done at the beginning of COPY FROM from Coordinator to Datanodes */
        if (IS_PGXC_COORDINATOR)
        {
            RemoteCopyData *remoteCopyState = cstate->remoteCopyState;

            /* Empty buffer info and send header to all the backends involved in COPY */
            resetStringInfo(&cstate->line_buf);

            enlargeStringInfo(&cstate->line_buf, 19);
            appendBinaryStringInfo(&cstate->line_buf, BinarySignature, 11);
            tmp = 0;

            if (cstate->oids)
                tmp |= (1 << 16);
            tmp = htonl(tmp);

            appendBinaryStringInfo(&cstate->line_buf, (char *) &tmp, 4);
            tmp = 0;
            tmp = htonl(tmp);
            appendBinaryStringInfo(&cstate->line_buf, (char *) &tmp, 4);

            if (DataNodeCopyInBinaryForAll(cstate->line_buf.data, 19,
                    getLocatorNodeCount(remoteCopyState->locator),
                    (PGXCNodeHandle **) getLocatorNodeMap(remoteCopyState->locator)))
                ereport(ERROR,
                            (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                             errmsg("invalid COPY file header (COPY SEND)")));
        }
#endif
    }

    if (cstate->file_has_oids && cstate->binary)
    {
        getTypeBinaryInputInfo(OIDOID,
                               &in_func_oid, &cstate->oid_typioparam);
        fmgr_info(in_func_oid, &cstate->oid_in_function);
    }

    /* create workspace for CopyReadAttributes results */
    if (!cstate->binary)
    {
        AttrNumber    attr_count = list_length(cstate->attnumlist);
        int            nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;

        cstate->max_fields = nfields;
        cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
    }

    MemoryContextSwitchTo(oldcontext);

    return cstate;
}

/*
 * Read raw fields in the next line for COPY FROM in text or csv mode.
 * Return false if no more lines.
 *
 * An internal temporary buffer is returned via 'fields'. It is valid until
 * the next call of the function. Since the function returns all raw fields
 * in the input file, 'nfields' could be different from the number of columns
 * in the relation.
 *
 * NOTE: force_not_null option are not applied to the returned fields.
 */
bool
NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
{// #lizard forgives
    int            fldct;
    bool        done;

    /* only available for text or csv input */
    Assert(!cstate->binary);

    /* on input just throw the header line away */
    if (cstate->cur_lineno == 0 && cstate->header_line)
    {
        cstate->cur_lineno++;
        if (CopyReadLine(cstate))
            return false;        /* done */
    }

    cstate->cur_lineno++;

    /* Actually read the line into memory here */
    done = CopyReadLine(cstate);

    /*
     * EOF at start of line means we're done.  If we see EOF after some
     * characters, we act as though it was newline followed by EOF, ie,
     * process the line and then exit loop on next iteration.
     */
    if (done && cstate->line_buf.len == 0)
        return false;

    /* Parse the line into de-escaped field values */
    if (cstate->csv_mode)
        fldct = CopyReadAttributesCSV(cstate);
#ifdef __OPENTENBASE__
    else if (cstate->internal_mode)
    {
        fldct = CopyReadAttributesInternal(cstate);
    }
#endif
    else
        fldct = CopyReadAttributesText(cstate);

    *fields = cstate->raw_fields;
    *nfields = fldct;
    return true;
}

/*
 * Read next tuple from file for COPY FROM. Return false if no more tuples.
 *
 * 'econtext' is used to evaluate default expression for each columns not
 * read from the file. It can be NULL when no default values are used, i.e.
 * when all columns are read from the file.
 *
 * 'values' and 'nulls' arrays must be the same length as columns of the
 * relation passed to BeginCopyFrom. This function fills the arrays.
 * Oid of the tuple is returned with 'tupleOid' separately.
 */
bool
NextCopyFrom(CopyState cstate, ExprContext *econtext,
             Datum *values, bool *nulls, Oid *tupleOid)
{// #lizard forgives
    TupleDesc    tupDesc;
    Form_pg_attribute *attr;
    AttrNumber    num_phys_attrs,
                attr_count,
                num_defaults = cstate->num_defaults;
    FmgrInfo   *in_functions = cstate->in_functions;
    Oid           *typioparams = cstate->typioparams;
    int            i;
    int            nfields;
    bool        isnull;
    bool        file_has_oids = cstate->file_has_oids;
    int           *defmap = cstate->defmap;
    ExprState **defexprs = cstate->defexprs;

    tupDesc = RelationGetDescr(cstate->rel);
    attr = tupDesc->attrs;
    num_phys_attrs = tupDesc->natts;
    attr_count = list_length(cstate->attnumlist);
    nfields = file_has_oids ? (attr_count + 1) : attr_count;

    /* Initialize all values for row to NULL */
    MemSet(values, 0, num_phys_attrs * sizeof(Datum));
    MemSet(nulls, true, num_phys_attrs * sizeof(bool));

    if (!cstate->binary)
    {
        char      **field_strings;
        ListCell   *cur;
        int            fldct;
        int            fieldno;
        char       *string;

#ifdef __OPENTENBASE__
        if (cstate->insert_into && cstate->data_list)
        {
            resetStringInfo(&cstate->line_buf);

            /* send data in datarow format to remote datanode */
            if (cstate->data_index < cstate->ndatarows)
            {
                int index = 0;
                uint16     n16 = 0;
                int total_columns = 0;
                field_strings = cstate->data_list[cstate->data_index];
                cstate->data_index++;
                fldct = cstate->data_ncolumns;
                total_columns = fldct + num_defaults;

                n16 = htons(total_columns);
                appendBinaryStringInfo(&cstate->line_buf, (char *) &n16, sizeof(n16));
                
                for(index = 0; index < fldct; index++)
                {
                    if (field_strings[index])
                    {
                        int len = strlen(field_strings[index]) + 1;
                        uint32 n32 = htonl(len);
                        appendBinaryStringInfo(&cstate->line_buf, (char *) &n32, sizeof(n32));
                        appendBinaryStringInfo(&cstate->line_buf, field_strings[index], len - 1);
                        appendStringInfoChar(&cstate->line_buf, '\0');
                    }
                    else
                    {
                        uint32 n32 = htonl(-1);
                        appendBinaryStringInfo(&cstate->line_buf, (char *) &n32, 4);
                    }
                }
            }
            else
            {
                return false;
            }
        }
        else
#endif
            /* read raw fields in the next line */
            if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
                return false;

        /* check for overflowing fields */
        if (nfields > 0 && fldct > nfields)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("extra data after last expected column")));

        fieldno = 0;

        /* Read the OID field if present */
        if (file_has_oids)
        {
            if (fieldno >= fldct)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("missing data for OID column")));
            string = field_strings[fieldno++];

            if (string == NULL)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("null OID in COPY data")));
            else if (cstate->oids && tupleOid != NULL)
            {
                cstate->cur_attname = "oid";
                cstate->cur_attval = string;
                *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
                                                                 CStringGetDatum(string)));
                if (*tupleOid == InvalidOid)
                    ereport(ERROR,
                            (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                             errmsg("invalid OID in COPY data")));
                cstate->cur_attname = NULL;
                cstate->cur_attval = NULL;
            }
        }

        /* Loop to read the user attributes on the line. */
        foreach(cur, cstate->attnumlist)
        {
            int            attnum = lfirst_int(cur);
            int            m = attnum - 1;

            if (fieldno >= fldct)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("missing data for column \"%s\"",
                                NameStr(attr[m]->attname))));
            string = field_strings[fieldno++];

            if (cstate->convert_select_flags &&
                !cstate->convert_select_flags[m])
            {
                /* ignore input field, leaving column as NULL */
                continue;
            }

            if (cstate->csv_mode)
            {
                if (string == NULL &&
                    cstate->force_notnull_flags[m])
                {
                    /*
                     * FORCE_NOT_NULL option is set and column is NULL -
                     * convert it to the NULL string.
                     */
                    string = cstate->null_print;
                }
                else if (string != NULL && cstate->force_null_flags[m]
                         && strcmp(string, cstate->null_print) == 0)
                {
                    /*
                     * FORCE_NULL option is set and column matches the NULL
                     * string. It must have been quoted, or otherwise the
                     * string would already have been set to NULL. Convert it
                     * to NULL as specified.
                     */
                    string = NULL;
                }
            }

            cstate->cur_attname = NameStr(attr[m]->attname);
            cstate->cur_attval = string;
            values[m] = InputFunctionCall(&in_functions[m],
                                          string,
                                          typioparams[m],
                                          attr[m]->atttypmod);
            if (string != NULL)
                nulls[m] = false;
            cstate->cur_attname = NULL;
            cstate->cur_attval = NULL;
        }

        Assert(fieldno == nfields);
    }
    else
    {
        /* binary */
        int16        fld_count;
        ListCell   *cur;

        cstate->cur_lineno++;

        if (!CopyGetInt16(cstate, &fld_count))
        {
#ifdef PGXC
            if (IS_PGXC_COORDINATOR)
            {
                /* Empty buffer */
                resetStringInfo(&cstate->line_buf);

                enlargeStringInfo(&cstate->line_buf, sizeof(uint16));
                /* Receive field count directly from Datanodes */
                fld_count = htons(fld_count);
                appendBinaryStringInfo(&cstate->line_buf, (char *) &fld_count, sizeof(uint16));
            }
#endif

            /* EOF detected (end of file, or protocol-level EOF) */
            return false;
        }

        if (fld_count == -1)
        {
            /*
             * Received EOF marker.  In a V3-protocol copy, wait for the
             * protocol-level EOF, and complain if it doesn't come
             * immediately.  This ensures that we correctly handle CopyFail,
             * if client chooses to send that now.
             *
             * Note that we MUST NOT try to read more data in an old-protocol
             * copy, since there is no protocol-level EOF marker then.  We
             * could go either way for copy from file, but choose to throw
             * error if there's data after the EOF marker, for consistency
             * with the new-protocol case.
             */
            char        dummy;

#ifdef PGXC
            if (IS_PGXC_COORDINATOR)
            {
                /* Empty buffer */
                resetStringInfo(&cstate->line_buf);

                enlargeStringInfo(&cstate->line_buf, sizeof(uint16));
                /* Receive field count directly from Datanodes */
                fld_count = htons(fld_count);
                appendBinaryStringInfo(&cstate->line_buf, (char *) &fld_count, sizeof(uint16));
            }
#endif

            if (cstate->copy_dest != COPY_OLD_FE &&
                CopyGetData(cstate, &dummy, 1, 1) > 0)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("received copy data after EOF marker")));
            return false;
        }

        if (fld_count != attr_count)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("row field count is %d, expected %d",
                            (int) fld_count, attr_count)));

#ifdef PGXC
        if (IS_PGXC_COORDINATOR)
        {
            /*
             * Include the default value count also, because we are going to
             * append default values to the user-supplied attributes.
             */
            int16 total_fld_count = fld_count + num_defaults;
            /* Empty buffer */
            resetStringInfo(&cstate->line_buf);

            enlargeStringInfo(&cstate->line_buf, sizeof(uint16));
            total_fld_count = htons(total_fld_count);
            appendBinaryStringInfo(&cstate->line_buf, (char *) &total_fld_count, sizeof(uint16));
        }
#endif

        if (file_has_oids)
        {
            Oid            loaded_oid;

            cstate->cur_attname = "oid";
            loaded_oid =
                DatumGetObjectId(CopyReadBinaryAttribute(cstate,
                                                         0,
                                                         &cstate->oid_in_function,
                                                         cstate->oid_typioparam,
                                                         -1,
                                                         &isnull));
            if (isnull || loaded_oid == InvalidOid)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         errmsg("invalid OID in COPY data")));
            cstate->cur_attname = NULL;
            if (cstate->oids && tupleOid != NULL)
                *tupleOid = loaded_oid;
        }

        i = 0;
        foreach(cur, cstate->attnumlist)
        {
            int            attnum = lfirst_int(cur);
            int            m = attnum - 1;

            cstate->cur_attname = NameStr(attr[m]->attname);
            i++;
            values[m] = CopyReadBinaryAttribute(cstate,
                                                i,
                                                &in_functions[m],
                                                typioparams[m],
                                                attr[m]->atttypmod,
                                                &nulls[m]);
            cstate->cur_attname = NULL;
        }
    }

    /*
     * Now compute and insert any defaults available for the columns not
     * provided by the input data.  Anything not processed here or above will
     * remain NULL.
     */
    for (i = 0; i < num_defaults; i++)
    {
        /*
         * The caller must supply econtext and have switched into the
         * per-tuple memory context in it.
         */
        Assert(econtext != NULL);
        Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);

        values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
                                         &nulls[defmap[i]]);
    }

#ifdef PGXC
    if (IS_PGXC_COORDINATOR)
    {
        /* Append default values to the data-row in output format. */
        append_defvals(values, cstate);
    }
#endif

    return true;
}

#ifdef PGXC
/*
 * append_defvals:
 * Append default values in output form onto the data-row.
 * 1. scans the default values with the help of defmap,
 * 2. converts each default value into its output form,
 * 3. then appends it into cstate->defval_buf buffer.
 * This buffer would later be appended into the final data row that is sent to
 * the Datanodes.
 * So for e.g., for a table :
 * tab (id1 int, v varchar, id2 default nextval('tab_id2_seq'::regclass), id3 )
 * with the user-supplied data  : "2 | abcd",
 * and the COPY command such as:
 * copy tab (id1, v) FROM '/tmp/a.txt' (delimiter '|');
 * Here, cstate->defval_buf will be populated with something like : "| 1"
 * and the final data row will be : "2 | abcd | 1"
 */
static void
append_defvals(Datum *values, CopyState cstate)
{// #lizard forgives
    CopyStateData new_cstate = *cstate;
    int i;

    new_cstate.fe_msgbuf = makeStringInfo();

    for (i = 0; i < cstate->num_defaults; i++)
    {
        int attindex = cstate->defmap[i];
        Datum defvalue = values[attindex];

        if (!cstate->binary)
            CopySendChar(&new_cstate, new_cstate.delim[0]);

        /*
         * For using the values in their output form, it is not sufficient
         * to just call its output function. The format should match
         * that of COPY because after all we are going to send this value as
         * an input data row to the Datanode using COPY FROM syntax. So we call
         * exactly those functions that are used to output the values in case
         * of COPY TO. For instace, CopyAttributeOutText() takes care of
         * escaping, CopySendInt32 take care of byte ordering, etc. All these
         * functions use cstate->fe_msgbuf to copy the data. But this field
         * already has the input data row. So, we need to use a separate
         * temporary cstate for this purpose. All the COPY options remain the
         * same, so new cstate will have all the fields copied from the original
         * cstate, except fe_msgbuf.
         */
        if (cstate->binary)
        {
            bytea       *outputbytes;

            outputbytes = SendFunctionCall(&cstate->out_functions[attindex], defvalue);
            CopySendInt32(&new_cstate, VARSIZE(outputbytes) - VARHDRSZ);
            CopySendData(&new_cstate, VARDATA(outputbytes),
                         VARSIZE(outputbytes) - VARHDRSZ);
        }
        else
        {
            char *string;

            string = OutputFunctionCall(&cstate->out_functions[attindex], defvalue);
            if (cstate->csv_mode)
                CopyAttributeOutCSV(&new_cstate, string,
                                    false /* don't force quote */,
                                    false /* there's at least one user-supplied attribute */ );
            else
                CopyAttributeOutText(&new_cstate, string);
#ifdef __OPENTENBASE__
            if (cstate->insert_into && cstate->data_list)
            {
                int len = strlen(string) + 1;
                uint32 n32 = htonl(len);
                appendBinaryStringInfo(&cstate->line_buf, (char *) &n32, sizeof(n32));
                appendBinaryStringInfo(&cstate->line_buf, string, len - 1);
                appendStringInfoChar(&cstate->line_buf, '\0');
            }
#endif
        }
    }

#ifdef __OPENTENBASE__
    if (cstate->insert_into && cstate->data_list)
    {
        /* do nothing, default values have been appended before */
    }
    else
    {
#endif
    /* Append the generated default values to the user-supplied data-row */
    appendBinaryStringInfo(&cstate->line_buf, new_cstate.fe_msgbuf->data,
                                              new_cstate.fe_msgbuf->len);
#ifdef __OPENTENBASE__
    }
#endif
}
#endif


/*
 * Clean up storage and release resources for COPY FROM.
 */
void
EndCopyFrom(CopyState cstate)
{
#ifdef PGXC
    RemoteCopyData *remoteCopyState = cstate->remoteCopyState;

    /* For PGXC related COPY, free also relation location data */
    if (IS_PGXC_COORDINATOR && remoteCopyState->rel_loc)
    {
        DataNodeCopyFinish(getLocatorNodeCount(remoteCopyState->locator),
                (PGXCNodeHandle **) getLocatorNodeMap(remoteCopyState->locator));
        FreeRemoteCopyData(remoteCopyState);
    }
#endif
    /* No COPY FROM related resources except memory. */

    EndCopy(cstate);
}

/*
 * Read the next input line and stash it in line_buf, with conversion to
 * server encoding.
 *
 * Result is true if read was terminated by EOF, false if terminated
 * by newline.  The terminating newline or EOF marker is not included
 * in the final value of line_buf.
 */
static bool
CopyReadLine(CopyState cstate)
{// #lizard forgives
    bool        result;

    resetStringInfo(&cstate->line_buf);
    cstate->line_buf_valid = true;

    /* Mark that encoding conversion hasn't occurred yet */
    cstate->line_buf_converted = false;

    /* Parse data and transfer into line_buf */
    result = CopyReadLineText(cstate);

    if (result)
    {
        /*
         * Reached EOF.  In protocol version 3, we should ignore anything
         * after \. up to the protocol end of copy data.  (XXX maybe better
         * not to treat \. as special?)
         */
        if (cstate->copy_dest == COPY_NEW_FE)
        {
            do
            {
                cstate->raw_buf_index = cstate->raw_buf_len;
            } while (CopyLoadRawBuf(cstate));
        }
    }
    else
    {
        /*
         * If we didn't hit EOF, then we must have transferred the EOL marker
         * to line_buf along with the data.  Get rid of it.
         */
        switch (cstate->eol_type)
        {
            case EOL_NL:
                Assert(cstate->line_buf.len >= 1);
                Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
                cstate->line_buf.len--;
                cstate->line_buf.data[cstate->line_buf.len] = '\0';
                break;
            case EOL_CR:
                Assert(cstate->line_buf.len >= 1);
                Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
                cstate->line_buf.len--;
                cstate->line_buf.data[cstate->line_buf.len] = '\0';
                break;
            case EOL_CRNL:
                Assert(cstate->line_buf.len >= 2);
                Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
                Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
                cstate->line_buf.len -= 2;
                cstate->line_buf.data[cstate->line_buf.len] = '\0';
                break;
            case EOL_UNKNOWN:
				/* shouldn't get here except we are transform from insert */
				if (!cstate->internal_mode)
                Assert(false);
                break;
        }
    }

    /* Done reading the line.  Convert it to server encoding. */
    if (cstate->need_transcoding)
    {
        char       *cvt;

        cvt = pg_any_to_server(cstate->line_buf.data,
                               cstate->line_buf.len,
                               cstate->file_encoding);
        if (cvt != cstate->line_buf.data)
        {
            /* transfer converted data back to line_buf */
            resetStringInfo(&cstate->line_buf);
            appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
            pfree(cvt);
        }
    }

    /* Now it's safe to use the buffer in error messages */
    cstate->line_buf_converted = true;

    return result;
}

/*
 * CopyReadLineText - inner loop of CopyReadLine for text mode
 */
static bool
CopyReadLineText(CopyState cstate)
{// #lizard forgives
    char       *copy_raw_buf;
    int            raw_buf_ptr;
    int            copy_buf_len;
    bool        need_data = false;
    bool        hit_eof = false;
    bool        result = false;
    char        mblen_str[2];

    /* CSV variables */
    bool        first_char_in_line = true;
    bool        in_quote = false,
                last_was_esc = false;
    char        quotec = '\0';
    char        escapec = '\0';

    if (cstate->csv_mode)
    {
        quotec = cstate->quote[0];
        escapec = cstate->escape[0];
        /* ignore special escape processing if it's the same as quotec */
        if (quotec == escapec)
            escapec = '\0';
    }

    mblen_str[1] = '\0';

    /*
     * The objective of this loop is to transfer the entire next input line
     * into line_buf.  Hence, we only care for detecting newlines (\r and/or
     * \n) and the end-of-copy marker (\.).
     *
     * In CSV mode, \r and \n inside a quoted field are just part of the data
     * value and are put in line_buf.  We keep just enough state to know if we
     * are currently in a quoted field or not.
     *
     * These four characters, and the CSV escape and quote characters, are
     * assumed the same in frontend and backend encodings.
     *
     * For speed, we try to move data from raw_buf to line_buf in chunks
     * rather than one character at a time.  raw_buf_ptr points to the next
     * character to examine; any characters from raw_buf_index to raw_buf_ptr
     * have been determined to be part of the line, but not yet transferred to
     * line_buf.
     *
     * For a little extra speed within the loop, we copy raw_buf and
     * raw_buf_len into local variables.
     */
    copy_raw_buf = cstate->raw_buf;
    raw_buf_ptr = cstate->raw_buf_index;
    copy_buf_len = cstate->raw_buf_len;

    for (;;)
    {
        int            prev_raw_ptr;
        char        c;

        /*
         * Load more data if needed.  Ideally we would just force four bytes
         * of read-ahead and avoid the many calls to
         * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
         * does not allow us to read too far ahead or we might read into the
         * next data, so we read-ahead only as far we know we can.  One
         * optimization would be to read-ahead four byte here if
         * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
         * considering the size of the buffer.
         */
        if (raw_buf_ptr >= copy_buf_len || need_data)
        {
            REFILL_LINEBUF;

            /*
             * Try to read some more data.  This will certainly reset
             * raw_buf_index to zero, and raw_buf_ptr must go with it.
             */
            if (!CopyLoadRawBuf(cstate))
                hit_eof = true;
            raw_buf_ptr = 0;
            copy_buf_len = cstate->raw_buf_len;

            /*
             * If we are completely out of data, break out of the loop,
             * reporting EOF.
             */
            if (copy_buf_len <= 0)
            {
                result = true;
                break;
            }
            need_data = false;
#ifdef __OPENTENBASE__
            if (cstate->internal_mode)
            {
                if (!cstate->whole_line)
                {
                    raw_buf_ptr = copy_buf_len;
                    cstate->raw_buf_index = cstate->raw_buf_len;
                    continue;
                }
                cstate->eol_type = EOL_UNKNOWN;
                return result;
            }
#endif
        }

        /* OK to fetch a character */
        prev_raw_ptr = raw_buf_ptr;
        c = copy_raw_buf[raw_buf_ptr++];

        if (cstate->csv_mode)
        {
            /*
             * If character is '\\' or '\r', we may need to look ahead below.
             * Force fetch of the next character if we don't already have it.
             * We need to do this before changing CSV state, in case one of
             * these characters is also the quote or escape character.
             *
             * Note: old-protocol does not like forced prefetch, but it's OK
             * here since we cannot validly be at EOF.
             */
            if (c == '\\' || c == '\r')
            {
                IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
            }

            /*
             * Dealing with quotes and escapes here is mildly tricky. If the
             * quote char is also the escape char, there's no problem - we
             * just use the char as a toggle. If they are different, we need
             * to ensure that we only take account of an escape inside a
             * quoted field and immediately preceding a quote char, and not
             * the second in an escape-escape sequence.
             */
            if (in_quote && c == escapec)
                last_was_esc = !last_was_esc;
            if (c == quotec && !last_was_esc)
                in_quote = !in_quote;
            if (c != escapec)
                last_was_esc = false;

            /*
             * Updating the line count for embedded CR and/or LF chars is
             * necessarily a little fragile - this test is probably about the
             * best we can do.  (XXX it's arguable whether we should do this
             * at all --- is cur_lineno a physical or logical count?)
             */
            if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
                cstate->cur_lineno++;
        }

        /* Process \r */
        if (c == '\r' && (!cstate->csv_mode || !in_quote))
        {
            /* Check for \r\n on first line, _and_ handle \r\n. */
            if (cstate->eol_type == EOL_UNKNOWN ||
                cstate->eol_type == EOL_CRNL)
            {
                /*
                 * If need more data, go back to loop top to load it.
                 *
                 * Note that if we are at EOF, c will wind up as '\0' because
                 * of the guaranteed pad of raw_buf.
                 */
                IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);

                /* get next char */
                c = copy_raw_buf[raw_buf_ptr];

                if (c == '\n')
                {
                    raw_buf_ptr++;    /* eat newline */
                    cstate->eol_type = EOL_CRNL;    /* in case not set yet */
                }
                else
                {
                    /* found \r, but no \n */
                    if (cstate->eol_type == EOL_CRNL)
                        ereport(ERROR,
                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                 !cstate->csv_mode ?
                                 errmsg("literal carriage return found in data") :
                                 errmsg("unquoted carriage return found in data"),
                                 !cstate->csv_mode ?
                                 errhint("Use \"\\r\" to represent carriage return.") :
                                 errhint("Use quoted CSV field to represent carriage return.")));

                    /*
                     * if we got here, it is the first line and we didn't find
                     * \n, so don't consume the peeked character
                     */
                    cstate->eol_type = EOL_CR;
                }
            }
            else if (cstate->eol_type == EOL_NL)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         !cstate->csv_mode ?
                         errmsg("literal carriage return found in data") :
                         errmsg("unquoted carriage return found in data"),
                         !cstate->csv_mode ?
                         errhint("Use \"\\r\" to represent carriage return.") :
                         errhint("Use quoted CSV field to represent carriage return.")));
            /* If reach here, we have found the line terminator */
            break;
        }

        /* Process \n */
        if (c == '\n' && (!cstate->csv_mode || !in_quote))
        {
            if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                         !cstate->csv_mode ?
                         errmsg("literal newline found in data") :
                         errmsg("unquoted newline found in data"),
                         !cstate->csv_mode ?
                         errhint("Use \"\\n\" to represent newline.") :
                         errhint("Use quoted CSV field to represent newline.")));
            cstate->eol_type = EOL_NL;    /* in case not set yet */
            /* If reach here, we have found the line terminator */
            break;
        }

        /*
         * In CSV mode, we only recognize \. alone on a line.  This is because
         * \. is a valid CSV data value.
         */
        if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
        {
            char        c2;

            IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
            IF_NEED_REFILL_AND_EOF_BREAK(0);

            /* -----
             * get next character
             * Note: we do not change c so if it isn't \., we can fall
             * through and continue processing for file encoding.
             * -----
             */
            c2 = copy_raw_buf[raw_buf_ptr];

            if (c2 == '.')
            {
                raw_buf_ptr++;    /* consume the '.' */

                /*
                 * Note: if we loop back for more data here, it does not
                 * matter that the CSV state change checks are re-executed; we
                 * will come back here with no important state changed.
                 */
                if (cstate->eol_type == EOL_CRNL)
                {
                    /* Get the next character */
                    IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
                    /* if hit_eof, c2 will become '\0' */
                    c2 = copy_raw_buf[raw_buf_ptr++];

                    if (c2 == '\n')
                    {
                        if (!cstate->csv_mode)
                            ereport(ERROR,
                                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                     errmsg("end-of-copy marker does not match previous newline style")));
                        else
                            NO_END_OF_COPY_GOTO;
                    }
                    else if (c2 != '\r')
                    {
                        if (!cstate->csv_mode)
                            ereport(ERROR,
                                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                     errmsg("end-of-copy marker corrupt")));
                        else
                            NO_END_OF_COPY_GOTO;
                    }
                }

                /* Get the next character */
                IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
                /* if hit_eof, c2 will become '\0' */
                c2 = copy_raw_buf[raw_buf_ptr++];

                if (c2 != '\r' && c2 != '\n')
                {
                    if (!cstate->csv_mode)
                        ereport(ERROR,
                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                 errmsg("end-of-copy marker corrupt")));
                    else
                        NO_END_OF_COPY_GOTO;
                }

                if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
                    (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
                    (cstate->eol_type == EOL_CR && c2 != '\r'))
                {
                    ereport(ERROR,
                            (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                             errmsg("end-of-copy marker does not match previous newline style")));
                }

                /*
                 * Transfer only the data before the \. into line_buf, then
                 * discard the data and the \. sequence.
                 */
                if (prev_raw_ptr > cstate->raw_buf_index)
                    appendBinaryStringInfo(&cstate->line_buf,
                                           cstate->raw_buf + cstate->raw_buf_index,
                                           prev_raw_ptr - cstate->raw_buf_index);
                cstate->raw_buf_index = raw_buf_ptr;
                result = true;    /* report EOF */
                break;
            }
            else if (!cstate->csv_mode)

                /*
                 * If we are here, it means we found a backslash followed by
                 * something other than a period.  In non-CSV mode, anything
                 * after a backslash is special, so we skip over that second
                 * character too.  If we didn't do that \\. would be
                 * considered an eof-of copy, while in non-CSV mode it is a
                 * literal backslash followed by a period.  In CSV mode,
                 * backslashes are not special, so we want to process the
                 * character after the backslash just like a normal character,
                 * so we don't increment in those cases.
                 */
                raw_buf_ptr++;
        }

        /*
         * This label is for CSV cases where \. appears at the start of a
         * line, but there is more text after it, meaning it was a data value.
         * We are more strict for \. in CSV mode because \. could be a data
         * value, while in non-CSV mode, \. cannot be a data value.
         */
not_end_of_copy:

        /*
         * Process all bytes of a multi-byte character as a group.
         *
         * We only support multi-byte sequences where the first byte has the
         * high-bit set, so as an optimization we can avoid this block
         * entirely if it is not set.
         */
        if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
        {
            int            mblen;

            mblen_str[0] = c;
            /* All our encodings only read the first byte to get the length */
            mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
            IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
            IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
            raw_buf_ptr += mblen - 1;
        }
        first_char_in_line = false;
    }                            /* end of outer loop */

    /*
     * Transfer any still-uncopied data to line_buf.
     */
    REFILL_LINEBUF;

    return result;
}

/*
 *    Return decimal value for a hexadecimal digit
 */
static int
GetDecimalFromHex(char hex)
{
    if (isdigit((unsigned char) hex))
        return hex - '0';
    else
        return tolower((unsigned char) hex) - 'a' + 10;
}

#ifdef __OPENTENBASE__
static int
CopyReadAttributesInternal(CopyState cstate)
{
    int            fieldno;
    uint16        n16;
    uint32        n32;
    int         i;
    int         col_count;
    int         data_len = 0;
    //char *cur = cstate->raw_buf + cstate->raw_buf_index;
    char *cur = cstate->fe_msgbuf->data;

    memcpy(&n16, cur, 2);
    cur += 2;
    //cstate->raw_buf_index += 2;
    data_len += 2;
    col_count = ntohs(n16);

    fieldno = 0;
    for (i = 0; i < col_count; i++)
    {
        int len;

        /* get size */
        memcpy(&n32, cur, 4);
        cur += 4;
        //cstate->raw_buf_index += 4;
        data_len += 4;
        len = ntohl(n32);

        if (i + 1 >= cstate->max_fields)
        {
            cstate->max_fields *= 2;
            cstate->raw_fields =
                repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
        }
                
        if (len == -1)
        {
            cstate->raw_fields[fieldno] = NULL;
        }
        else
        {
            cstate->raw_fields[fieldno] = cur;
            cur += len;
            //cstate->raw_buf_index += len;
            data_len += len;
        }

        fieldno++;
    }

    cstate->raw_buf_index = cstate->raw_buf_len;

    if (data_len != cstate->fe_msgbuf->len)
    {
        elog(ERROR, "copy datarow corrupted, expected data_length %d, result data_length %d",
                     cstate->fe_msgbuf->len, data_len);
    }

    return fieldno;
}
#endif

/*
 * Parse the current line into separate attributes (fields),
 * performing de-escaping as needed.
 *
 * The input is in line_buf.  We use attribute_buf to hold the result
 * strings.  cstate->raw_fields[k] is set to point to the k'th attribute
 * string, or NULL when the input matches the null marker string.
 * This array is expanded as necessary.
 *
 * (Note that the caller cannot check for nulls since the returned
 * string would be the post-de-escaping equivalent, which may look
 * the same as some valid data string.)
 *
 * delim is the column delimiter string (must be just one byte for now).
 * null_print is the null marker string.  Note that this is compared to
 * the pre-de-escaped input string.
 *
 * The return value is the number of fields actually read.
 */
static int
CopyReadAttributesText(CopyState cstate)
{// #lizard forgives
    char        delimc = cstate->delim[0];
    int            fieldno;
    char       *output_ptr;
    char       *cur_ptr;
    char       *line_end_ptr;

    /*
     * We need a special case for zero-column tables: check that the input
     * line is empty, and return.
     */
    if (cstate->max_fields <= 0)
    {
        if (cstate->line_buf.len != 0)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("extra data after last expected column")));
        return 0;
    }

    resetStringInfo(&cstate->attribute_buf);

    /*
     * The de-escaped attributes will certainly not be longer than the input
     * data line, so we can just force attribute_buf to be large enough and
     * then transfer data without any checks for enough space.  We need to do
     * it this way because enlarging attribute_buf mid-stream would invalidate
     * pointers already stored into cstate->raw_fields[].
     */
    if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
        enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
    output_ptr = cstate->attribute_buf.data;

    /* set pointer variables for loop */
    cur_ptr = cstate->line_buf.data;
    line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;

    /* Outer loop iterates over fields */
    fieldno = 0;
    for (;;)
    {
        bool        found_delim = false;
        char       *start_ptr;
        char       *end_ptr;
        int            input_len;
        bool        saw_non_ascii = false;

        /* Make sure there is enough space for the next value */
        if (fieldno >= cstate->max_fields)
        {
            cstate->max_fields *= 2;
            cstate->raw_fields =
                repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
        }

        /* Remember start of field on both input and output sides */
        start_ptr = cur_ptr;
        cstate->raw_fields[fieldno] = output_ptr;

        /*
         * Scan data for field.
         *
         * Note that in this loop, we are scanning to locate the end of field
         * and also speculatively performing de-escaping.  Once we find the
         * end-of-field, we can match the raw field contents against the null
         * marker string.  Only after that comparison fails do we know that
         * de-escaping is actually the right thing to do; therefore we *must
         * not* throw any syntax errors before we've done the null-marker
         * check.
         */
        for (;;)
        {
            char        c;

            end_ptr = cur_ptr;
            if (cur_ptr >= line_end_ptr)
                break;
            c = *cur_ptr++;
            if (c == delimc)
            {
                found_delim = true;
                break;
            }
            if (c == '\\')
            {
                if (cur_ptr >= line_end_ptr)
                    break;
                c = *cur_ptr++;
                switch (c)
                {
                    case '0':
                    case '1':
                    case '2':
                    case '3':
                    case '4':
                    case '5':
                    case '6':
                    case '7':
                        {
                            /* handle \013 */
                            int            val;

                            val = OCTVALUE(c);
                            if (cur_ptr < line_end_ptr)
                            {
                                c = *cur_ptr;
                                if (ISOCTAL(c))
                                {
                                    cur_ptr++;
                                    val = (val << 3) + OCTVALUE(c);
                                    if (cur_ptr < line_end_ptr)
                                    {
                                        c = *cur_ptr;
                                        if (ISOCTAL(c))
                                        {
                                            cur_ptr++;
                                            val = (val << 3) + OCTVALUE(c);
                                        }
                                    }
                                }
                            }
                            c = val & 0377;
                            if (c == '\0' || IS_HIGHBIT_SET(c))
                                saw_non_ascii = true;
                        }
                        break;
                    case 'x':
                        /* Handle \x3F */
                        if (cur_ptr < line_end_ptr)
                        {
                            char        hexchar = *cur_ptr;

                            if (isxdigit((unsigned char) hexchar))
                            {
                                int            val = GetDecimalFromHex(hexchar);

                                cur_ptr++;
                                if (cur_ptr < line_end_ptr)
                                {
                                    hexchar = *cur_ptr;
                                    if (isxdigit((unsigned char) hexchar))
                                    {
                                        cur_ptr++;
                                        val = (val << 4) + GetDecimalFromHex(hexchar);
                                    }
                                }
                                c = val & 0xff;
                                if (c == '\0' || IS_HIGHBIT_SET(c))
                                    saw_non_ascii = true;
                            }
                        }
                        break;
                    case 'b':
                        c = '\b';
                        break;
                    case 'f':
                        c = '\f';
                        break;
                    case 'n':
                        c = '\n';
                        break;
                    case 'r':
                        c = '\r';
                        break;
                    case 't':
                        c = '\t';
                        break;
                    case 'v':
                        c = '\v';
                        break;

                        /*
                         * in all other cases, take the char after '\'
                         * literally
                         */
                }
            }

            /* Add c to output string */
            *output_ptr++ = c;
        }

        /* Check whether raw input matched null marker */
        input_len = end_ptr - start_ptr;
        if (input_len == cstate->null_print_len &&
            strncmp(start_ptr, cstate->null_print, input_len) == 0)
            cstate->raw_fields[fieldno] = NULL;
        else
        {
            /*
             * At this point we know the field is supposed to contain data.
             *
             * If we de-escaped any non-7-bit-ASCII chars, make sure the
             * resulting string is valid data for the db encoding.
             */
            if (saw_non_ascii)
            {
                char       *fld = cstate->raw_fields[fieldno];

                pg_verifymbstr(fld, output_ptr - fld, false);
            }
        }

        /* Terminate attribute value in output area */
        *output_ptr++ = '\0';

        fieldno++;
        /* Done if we hit EOL instead of a delim */
        if (!found_delim)
            break;
    }

    /* Clean up state of attribute_buf */
    output_ptr--;
    Assert(*output_ptr == '\0');
    cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);

    return fieldno;
}

/*
 * Parse the current line into separate attributes (fields),
 * performing de-escaping as needed.  This has exactly the same API as
 * CopyReadAttributesText, except we parse the fields according to
 * "standard" (i.e. common) CSV usage.
 */
static int
CopyReadAttributesCSV(CopyState cstate)
{// #lizard forgives
    char        delimc = cstate->delim[0];
    char        quotec = cstate->quote[0];
    char        escapec = cstate->escape[0];
    int            fieldno;
    char       *output_ptr;
    char       *cur_ptr;
    char       *line_end_ptr;

    /*
     * We need a special case for zero-column tables: check that the input
     * line is empty, and return.
     */
    if (cstate->max_fields <= 0)
    {
        if (cstate->line_buf.len != 0)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                     errmsg("extra data after last expected column")));
        return 0;
    }

    resetStringInfo(&cstate->attribute_buf);

    /*
     * The de-escaped attributes will certainly not be longer than the input
     * data line, so we can just force attribute_buf to be large enough and
     * then transfer data without any checks for enough space.  We need to do
     * it this way because enlarging attribute_buf mid-stream would invalidate
     * pointers already stored into cstate->raw_fields[].
     */
    if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
        enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
    output_ptr = cstate->attribute_buf.data;

    /* set pointer variables for loop */
    cur_ptr = cstate->line_buf.data;
    line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;

    /* Outer loop iterates over fields */
    fieldno = 0;
    for (;;)
    {
        bool        found_delim = false;
        bool        saw_quote = false;
        char       *start_ptr;
        char       *end_ptr;
        int            input_len;

        /* Make sure there is enough space for the next value */
        if (fieldno >= cstate->max_fields)
        {
            cstate->max_fields *= 2;
            cstate->raw_fields =
                repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
        }

        /* Remember start of field on both input and output sides */
        start_ptr = cur_ptr;
        cstate->raw_fields[fieldno] = output_ptr;

        /*
         * Scan data for field,
         *
         * The loop starts in "not quote" mode and then toggles between that
         * and "in quote" mode. The loop exits normally if it is in "not
         * quote" mode and a delimiter or line end is seen.
         */
        for (;;)
        {
            char        c;

            /* Not in quote */
            for (;;)
            {
                end_ptr = cur_ptr;
                if (cur_ptr >= line_end_ptr)
                    goto endfield;
                c = *cur_ptr++;
                /* unquoted field delimiter */
                if (c == delimc)
                {
                    found_delim = true;
                    goto endfield;
                }
                /* start of quoted field (or part of field) */
                if (c == quotec)
                {
                    saw_quote = true;
                    break;
                }
                /* Add c to output string */
                *output_ptr++ = c;
            }

            /* In quote */
            for (;;)
            {
                end_ptr = cur_ptr;
                if (cur_ptr >= line_end_ptr)
                    ereport(ERROR,
                            (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                             errmsg("unterminated CSV quoted field")));

                c = *cur_ptr++;

                /* escape within a quoted field */
                if (c == escapec)
                {
                    /*
                     * peek at the next char if available, and escape it if it
                     * is an escape char or a quote char
                     */
                    if (cur_ptr < line_end_ptr)
                    {
                        char        nextc = *cur_ptr;

                        if (nextc == escapec || nextc == quotec)
                        {
                            *output_ptr++ = nextc;
                            cur_ptr++;
                            continue;
                        }
                    }
                }

                /*
                 * end of quoted field. Must do this test after testing for
                 * escape in case quote char and escape char are the same
                 * (which is the common case).
                 */
                if (c == quotec)
                    break;

                /* Add c to output string */
                *output_ptr++ = c;
            }
        }
endfield:

        /* Terminate attribute value in output area */
        *output_ptr++ = '\0';

        /* Check whether raw input matched null marker */
        input_len = end_ptr - start_ptr;
        if (!saw_quote && input_len == cstate->null_print_len &&
            strncmp(start_ptr, cstate->null_print, input_len) == 0)
            cstate->raw_fields[fieldno] = NULL;

        fieldno++;
        /* Done if we hit EOL instead of a delim */
        if (!found_delim)
            break;
    }

    /* Clean up state of attribute_buf */
    output_ptr--;
    Assert(*output_ptr == '\0');
    cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);

    return fieldno;
}


/*
 * Read a binary attribute
 */
static Datum
CopyReadBinaryAttribute(CopyState cstate,
                        int column_no, FmgrInfo *flinfo,
                        Oid typioparam, int32 typmod,
                        bool *isnull)
{// #lizard forgives
    int32        fld_size;
    int32         nSize;
    Datum        result;

    if (!CopyGetInt32(cstate, &fld_size))
        ereport(ERROR,
                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                 errmsg("unexpected EOF in COPY data")));

#ifdef PGXC
    if (IS_PGXC_COORDINATOR)
    {
        /* Add field size to the data row, unless it is invalid. */
        if (fld_size >= -1) /* -1 is valid; it means NULL value */
        {
            nSize = htonl(fld_size);
            appendBinaryStringInfo(&cstate->line_buf,
                                   (char *) &nSize, sizeof(int32));
        }
    }
#endif

    if (fld_size == -1)
    {
        *isnull = true;
        return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
    }
    if (fld_size < 0)
        ereport(ERROR,
                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                 errmsg("invalid field size")));

    /* reset attribute_buf to empty, and load raw data in it */
    resetStringInfo(&cstate->attribute_buf);

    enlargeStringInfo(&cstate->attribute_buf, fld_size);
    if (CopyGetData(cstate, cstate->attribute_buf.data,
                    fld_size, fld_size) != fld_size)
        ereport(ERROR,
                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                 errmsg("unexpected EOF in COPY data")));

    cstate->attribute_buf.len = fld_size;
    cstate->attribute_buf.data[fld_size] = '\0';
#ifdef PGXC
    if (IS_PGXC_COORDINATOR)
    {
        /* add the binary attribute value to the data row */
        appendBinaryStringInfo(&cstate->line_buf, cstate->attribute_buf.data, fld_size);
    }
#endif

    /* Call the column type's binary input converter */
    result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
                                 typioparam, typmod);

    /* Trouble if it didn't eat the whole buffer */
    if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
                 errmsg("incorrect binary data format")));

    *isnull = false;
    return result;
}

/*
 * Send text representation of one attribute, with conversion and escaping
 */
#define DUMPSOFAR() \
    do { \
        if (ptr > start) \
            CopySendData(cstate, start, ptr - start); \
    } while (0)

static void
CopyAttributeOutText(CopyState cstate, char *string)
{// #lizard forgives
    char       *ptr;
    char       *start;
    char        c;
    char        delimc = cstate->delim[0];

    if (cstate->need_transcoding)
        ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
    else
        ptr = string;

    /*
     * We have to grovel through the string searching for control characters
     * and instances of the delimiter character.  In most cases, though, these
     * are infrequent.  To avoid overhead from calling CopySendData once per
     * character, we dump out all characters between escaped characters in a
     * single call.  The loop invariant is that the data from "start" to "ptr"
     * can be sent literally, but hasn't yet been.
     *
     * We can skip pg_encoding_mblen() overhead when encoding is safe, because
     * in valid backend encodings, extra bytes of a multibyte character never
     * look like ASCII.  This loop is sufficiently performance-critical that
     * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
     * of the normal safe-encoding path.
     */
    if (cstate->encoding_embeds_ascii)
    {
        start = ptr;
        while ((c = *ptr) != '\0')
        {
            if ((unsigned char) c < (unsigned char) 0x20)
            {
                /*
                 * \r and \n must be escaped, the others are traditional. We
                 * prefer to dump these using the C-like notation, rather than
                 * a backslash and the literal character, because it makes the
                 * dump file a bit more proof against Microsoftish data
                 * mangling.
                 */
                switch (c)
                {
                    case '\b':
                        c = 'b';
                        break;
                    case '\f':
                        c = 'f';
                        break;
                    case '\n':
                        c = 'n';
                        break;
                    case '\r':
                        c = 'r';
                        break;
                    case '\t':
                        c = 't';
                        break;
                    case '\v':
                        c = 'v';
                        break;
                    default:
                        /* If it's the delimiter, must backslash it */
                        if (c == delimc)
                            break;
                        /* All ASCII control chars are length 1 */
                        ptr++;
                        continue;    /* fall to end of loop */
                }
                /* if we get here, we need to convert the control char */
                DUMPSOFAR();
                CopySendChar(cstate, '\\');
                CopySendChar(cstate, c);
                start = ++ptr;    /* do not include char in next run */
            }
            else if (c == '\\' || c == delimc)
            {
                DUMPSOFAR();
                CopySendChar(cstate, '\\');
                start = ptr++;    /* we include char in next run */
            }
            else if (IS_HIGHBIT_SET(c))
                ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
            else
                ptr++;
        }
    }
    else
    {
        start = ptr;
        while ((c = *ptr) != '\0')
        {
            if ((unsigned char) c < (unsigned char) 0x20)
            {
                /*
                 * \r and \n must be escaped, the others are traditional. We
                 * prefer to dump these using the C-like notation, rather than
                 * a backslash and the literal character, because it makes the
                 * dump file a bit more proof against Microsoftish data
                 * mangling.
                 */
                switch (c)
                {
                    case '\b':
                        c = 'b';
                        break;
                    case '\f':
                        c = 'f';
                        break;
                    case '\n':
                        c = 'n';
                        break;
                    case '\r':
                        c = 'r';
                        break;
                    case '\t':
                        c = 't';
                        break;
                    case '\v':
                        c = 'v';
                        break;
                    default:
                        /* If it's the delimiter, must backslash it */
                        if (c == delimc)
                            break;
                        /* All ASCII control chars are length 1 */
                        ptr++;
                        continue;    /* fall to end of loop */
                }
                /* if we get here, we need to convert the control char */
                DUMPSOFAR();
                CopySendChar(cstate, '\\');
                CopySendChar(cstate, c);
                start = ++ptr;    /* do not include char in next run */
            }
            else if (c == '\\' || c == delimc)
            {
                DUMPSOFAR();
                CopySendChar(cstate, '\\');
                start = ptr++;    /* we include char in next run */
            }
            else
                ptr++;
        }
    }

    DUMPSOFAR();
}

/*
 * Send text representation of one attribute, with conversion and
 * CSV-style escaping
 */
static void
CopyAttributeOutCSV(CopyState cstate, char *string,
                    bool use_quote, bool single_attr)
{// #lizard forgives
    char       *ptr;
    char       *start;
    char        c;
    char        delimc = cstate->delim[0];
    char        quotec = cstate->quote[0];
    char        escapec = cstate->escape[0];

    /* force quoting if it matches null_print (before conversion!) */
    if (!use_quote && strcmp(string, cstate->null_print) == 0)
        use_quote = true;

    if (cstate->need_transcoding)
        ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
    else
        ptr = string;

    /*
     * Make a preliminary pass to discover if it needs quoting
     */
    if (!use_quote)
    {
        /*
         * Because '\.' can be a data value, quote it if it appears alone on a
         * line so it is not interpreted as the end-of-data marker.
         */
        if (single_attr && strcmp(ptr, "\\.") == 0)
            use_quote = true;
        else
        {
            char       *tptr = ptr;

            while ((c = *tptr) != '\0')
            {
                if (c == delimc || c == quotec || c == '\n' || c == '\r')
                {
                    use_quote = true;
                    break;
                }
                if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
                    tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
                else
                    tptr++;
            }
        }
    }

    if (use_quote)
    {
        CopySendChar(cstate, quotec);

        /*
         * We adopt the same optimization strategy as in CopyAttributeOutText
         */
        start = ptr;
        while ((c = *ptr) != '\0')
        {
            if (c == quotec || c == escapec)
            {
                DUMPSOFAR();
                CopySendChar(cstate, escapec);
                start = ptr;    /* we include char in next run */
            }
            if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
                ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
            else
                ptr++;
        }
        DUMPSOFAR();

        CopySendChar(cstate, quotec);
    }
    else
    {
        /* If it doesn't need quoting, we can just dump it as-is */
        CopySendString(cstate, ptr);
    }
}

/*
 * CopyGetAttnums - build an integer list of attnums to be copied
 *
 * The input attnamelist is either the user-specified column list,
 * or NIL if there was none (in which case we want all the non-dropped
 * columns).
 *
 * rel can be NULL ... it's only used for error reports.
 */
static List *
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{// #lizard forgives
    List       *attnums = NIL;

    if (attnamelist == NIL)
    {
        /* Generate default column list */
        Form_pg_attribute *attr = tupDesc->attrs;
        int            attr_count = tupDesc->natts;
        int            i;

        for (i = 0; i < attr_count; i++)
        {
            if (attr[i]->attisdropped)
                continue;
            attnums = lappend_int(attnums, i + 1);
        }
    }
    else
    {
        /* Validate the user-supplied list and extract attnums */
        ListCell   *l;

        foreach(l, attnamelist)
        {
            char       *name = strVal(lfirst(l));
            int            attnum;
            int            i;

            /* Lookup column name */
            attnum = InvalidAttrNumber;
            for (i = 0; i < tupDesc->natts; i++)
            {
                if (tupDesc->attrs[i]->attisdropped)
                    continue;
                if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
                {
                    attnum = tupDesc->attrs[i]->attnum;
                    break;
                }
            }
            if (attnum == InvalidAttrNumber)
            {
                if (rel != NULL)
                    ereport(ERROR,
                            (errcode(ERRCODE_UNDEFINED_COLUMN),
                             errmsg("column \"%s\" of relation \"%s\" does not exist",
                                    name, RelationGetRelationName(rel))));
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_UNDEFINED_COLUMN),
                             errmsg("column \"%s\" does not exist",
                                    name)));
            }
            /* Check for duplicates */
            if (list_member_int(attnums, attnum))
                ereport(ERROR,
                        (errcode(ERRCODE_DUPLICATE_COLUMN),
                         errmsg("column \"%s\" specified more than once",
                                name)));
            attnums = lappend_int(attnums, attnum);
        }
    }

    return attnums;
}


/*
 * copy_dest_startup --- executor startup
 */
static void
copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
    /* no-op */
}

/*
 * copy_dest_receive --- receive one tuple
 */
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
    DR_copy    *myState = (DR_copy *) self;
    CopyState    cstate = myState->cstate;

    /* Make sure the tuple is fully deconstructed */
    slot_getallattrs(slot);

    /* And send the data */
    CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
    myState->processed++;

    return true;
}

/*
 * copy_dest_shutdown --- executor end
 */
static void
copy_dest_shutdown(DestReceiver *self)
{
    /* no-op */
}

/*
 * copy_dest_destroy --- release DestReceiver object
 */
static void
copy_dest_destroy(DestReceiver *self)
{
    pfree(self);
}

/*
 * CreateCopyDestReceiver -- create a suitable DestReceiver object
 */
DestReceiver *
CreateCopyDestReceiver(void)
{
    DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));

    self->pub.receiveSlot = copy_dest_receive;
    self->pub.rStartup = copy_dest_startup;
    self->pub.rShutdown = copy_dest_shutdown;
    self->pub.rDestroy = copy_dest_destroy;
    self->pub.mydest = DestCopyOut;

    self->cstate = NULL;        /* will be set later */
    self->processed = 0;

    return (DestReceiver *) self;
}

#ifdef PGXC
static RemoteCopyOptions *
GetRemoteCopyOptions(CopyState cstate)
{// #lizard forgives
    RemoteCopyOptions *res = makeRemoteCopyOptions();
    Assert(cstate);

    /* Then fill in structure */
    res->rco_binary = cstate->binary;
    res->rco_oids = cstate->oids;
    res->rco_csv_mode = cstate->csv_mode;
    if (cstate->delim)
        res->rco_delim = pstrdup(cstate->delim);
    if (cstate->null_print)
        res->rco_null_print = pstrdup(cstate->null_print);
    if (cstate->quote)
        res->rco_quote = pstrdup(cstate->quote);
    if (cstate->escape)
        res->rco_escape = pstrdup(cstate->escape);
    if (cstate->force_quote)
        res->rco_force_quote = list_copy(cstate->force_quote);
    if (cstate->force_notnull)
        res->rco_force_notnull = list_copy(cstate->force_notnull);
#ifdef __OPENTENBASE__
    res->rco_insert_into = cstate->insert_into;
#endif

    return res;
}
#endif
